Repository: spark
Updated Branches:
  refs/heads/master 606ae491e -> 40de176c9


[SPARK-16496][SQL] Add wholetext as option for reading text in SQL.

## What changes were proposed in this pull request?

In multiple text analysis problems, it is not often desirable for the rows to 
be split by "\n". There exists a wholeText reader for RDD API, and this JIRA 
just adds the same support for Dataset API.
## How was this patch tested?

Added relevant new tests for both scala and Java APIs

Author: Prashant Sharma <prash...@in.ibm.com>
Author: Prashant Sharma <prash...@apache.org>

Closes #14151 from ScrapCodes/SPARK-16496/wholetext.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40de176c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40de176c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40de176c

Branch: refs/heads/master
Commit: 40de176c93c5aa05bcbb1328721118b6b46ba51d
Parents: 606ae49
Author: Prashant Sharma <prash...@in.ibm.com>
Authored: Thu Dec 14 11:19:34 2017 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Thu Dec 14 11:19:34 2017 -0800

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py                |   7 +-
 .../org/apache/spark/sql/DataFrameReader.scala  |  16 ++-
 .../datasources/HadoopFileWholeTextReader.scala |  57 ++++++++++
 .../datasources/text/TextFileFormat.scala       |  31 +++++-
 .../datasources/text/TextOptions.scala          |   7 ++
 .../execution/datasources/text/TextSuite.scala  |   5 +-
 .../datasources/text/WholeTextFileSuite.scala   | 108 +++++++++++++++++++
 7 files changed, 221 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/40de176c/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 1ad974e..4e58bfb 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -304,7 +304,7 @@ class DataFrameReader(OptionUtils):
 
     @ignore_unicode_prefix
     @since(1.6)
-    def text(self, paths):
+    def text(self, paths, wholetext=False):
         """
         Loads text files and returns a :class:`DataFrame` whose schema starts 
with a
         string column named "value", and followed by partitioned columns if 
there
@@ -313,11 +313,16 @@ class DataFrameReader(OptionUtils):
         Each line in the text file is a new row in the resulting DataFrame.
 
         :param paths: string, or list of strings, for input path(s).
+        :param wholetext: if true, read each file from input path(s) as a 
single row.
 
         >>> df = spark.read.text('python/test_support/sql/text-test.txt')
         >>> df.collect()
         [Row(value=u'hello'), Row(value=u'this')]
+        >>> df = spark.read.text('python/test_support/sql/text-test.txt', 
wholetext=True)
+        >>> df.collect()
+        [Row(value=u'hello\\nthis')]
         """
+        self._set_opts(wholetext=wholetext)
         if isinstance(paths, basestring):
             paths = [paths]
         return 
self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths)))

http://git-wip-us.apache.org/repos/asf/spark/blob/40de176c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index ea1cf66..39fec8f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -646,7 +646,14 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * Loads text files and returns a `DataFrame` whose schema starts with a 
string column named
    * "value", and followed by partitioned columns if there are any.
    *
-   * Each line in the text files is a new row in the resulting DataFrame. For 
example:
+   * You can set the following text-specific option(s) for reading text files:
+   * <ul>
+   * <li>`wholetext` ( default `false`): If true, read a file as a single row 
and not split by "\n".
+   * </li>
+   * </ul>
+   * By default, each line in the text files is a new row in the resulting 
DataFrame.
+   *
+   * Usage example:
    * {{{
    *   // Scala:
    *   spark.read.text("/path/to/spark/README.md")
@@ -678,7 +685,12 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * If the directory structure of the text files contains partitioning 
information, those are
    * ignored in the resulting Dataset. To include partitioning information as 
columns, use `text`.
    *
-   * Each line in the text files is a new element in the resulting Dataset. 
For example:
+   * You can set the following textFile-specific option(s) for reading text 
files:
+   * <ul>
+   * <li>`wholetext` ( default `false`): If true, read a file as a single row 
and not split by "\n".
+   * </li>
+   * </ul>
+   * By default, each line in the text files is a new row in the resulting 
DataFrame. For example:
    * {{{
    *   // Scala:
    *   spark.read.textFile("/path/to/spark/README.md")

http://git-wip-us.apache.org/repos/asf/spark/blob/40de176c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala
new file mode 100644
index 0000000..c61a89e
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.Closeable
+import java.net.URI
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+
+import org.apache.spark.input.WholeTextFileRecordReader
+
+/**
+ * An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which 
is all of the lines
+ * in that file.
+ */
+class HadoopFileWholeTextReader(file: PartitionedFile, conf: Configuration)
+  extends Iterator[Text] with Closeable {
+  private val iterator = {
+    val fileSplit = new CombineFileSplit(
+      Array(new Path(new URI(file.filePath))),
+      Array(file.start),
+      Array(file.length),
+      // TODO: Implement Locality
+      Array.empty[String])
+    val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 
0), 0)
+    val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
+    val reader = new WholeTextFileRecordReader(fileSplit, 
hadoopAttemptContext, 0)
+    reader.initialize(fileSplit, hadoopAttemptContext)
+    new RecordReaderIterator(reader)
+  }
+
+  override def hasNext: Boolean = iterator.hasNext
+
+  override def next(): Text = iterator.next()
+
+  override def close(): Unit = iterator.close()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/40de176c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index d069044..8a6ab30 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -17,12 +17,16 @@
 
 package org.apache.spark.sql.execution.datasources.text
 
+import java.io.Closeable
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.Text
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
@@ -53,6 +57,14 @@ class TextFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
     }
   }
 
+  override def isSplitable(
+      sparkSession: SparkSession,
+      options: Map[String, String],
+      path: Path): Boolean = {
+    val textOptions = new TextOptions(options)
+    super.isSplitable(sparkSession, options, path) && !textOptions.wholeText
+  }
+
   override def inferSchema(
       sparkSession: SparkSession,
       options: Map[String, String],
@@ -97,14 +109,25 @@ class TextFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
     assert(
       requiredSchema.length <= 1,
       "Text data source only produces a single data column named \"value\".")
-
+    val textOptions = new TextOptions(options)
     val broadcastedHadoopConf =
       sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
+    readToUnsafeMem(broadcastedHadoopConf, requiredSchema, 
textOptions.wholeText)
+  }
+
+  private def readToUnsafeMem(conf: Broadcast[SerializableConfiguration],
+      requiredSchema: StructType, wholeTextMode: Boolean):
+  (PartitionedFile) => Iterator[UnsafeRow] = {
+
     (file: PartitionedFile) => {
-      val reader = new HadoopFileLinesReader(file, 
broadcastedHadoopConf.value.value)
+      val confValue = conf.value.value
+      val reader = if (!wholeTextMode) {
+        new HadoopFileLinesReader(file, confValue)
+      } else {
+        new HadoopFileWholeTextReader(file, confValue)
+      }
       Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => 
reader.close()))
-
       if (requiredSchema.isEmpty) {
         val emptyUnsafeRow = new UnsafeRow(0)
         reader.map(_ => emptyUnsafeRow)

http://git-wip-us.apache.org/repos/asf/spark/blob/40de176c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
index 49bd738..2a66156 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
@@ -33,8 +33,15 @@ private[text] class TextOptions(@transient private val 
parameters: CaseInsensiti
    * Compression codec to use.
    */
   val compressionCodec = 
parameters.get(COMPRESSION).map(CompressionCodecs.getCodecClassName)
+
+  /**
+   * wholetext - If true, read a file as a single row and not split by "\n".
+   */
+  val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean
+
 }
 
 private[text] object TextOptions {
   val COMPRESSION = "compression"
+  val WHOLETEXT = "wholetext"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/40de176c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index cb7393c..3328704 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -185,10 +185,9 @@ class TextSuite extends QueryTest with SharedSQLContext {
     val data = df.collect()
     assert(data(0) == Row("This is a test file for the text data source"))
     assert(data(1) == Row("1+1"))
-    // non ascii characters are not allowed in the code, so we disable the 
scalastyle here.
-    // scalastyle:off
+    // scalastyle:off nonascii
     assert(data(2) == Row("数据砖头"))
-    // scalastyle:on
+    // scalastyle:on nonascii
     assert(data(3) == Row("\"doh\""))
     assert(data.length == 4)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/40de176c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala
new file mode 100644
index 0000000..8bd736b
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.text
+
+import java.io.File
+
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{StringType, StructType}
+
+class WholeTextFileSuite extends QueryTest with SharedSQLContext {
+
+  // Hadoop's FileSystem caching does not use the Configuration as part of its 
cache key, which
+  // can cause Filesystem.get(Configuration) to return a cached instance 
created with a different
+  // configuration than the one passed to get() (see HADOOP-8490 for more 
details). This caused
+  // hard-to-reproduce test failures, since any suites that were run after 
this one would inherit
+  // the new value of "fs.local.block.size" (see SPARK-5227 and SPARK-5679). 
To work around this,
+  // we disable FileSystem caching in this suite.
+  protected override def sparkConf =
+    super.sparkConf.set("spark.hadoop.fs.file.impl.disable.cache", "true")
+
+  private def testFile: String = {
+    
Thread.currentThread().getContextClassLoader.getResource("test-data/text-suite.txt").toString
+  }
+
+  test("reading text file with option wholetext=true") {
+    val df = spark.read.option("wholetext", "true")
+      .format("text").load(testFile)
+    // schema
+    assert(df.schema == new StructType().add("value", StringType))
+
+    // verify content
+    val data = df.collect()
+    assert(data(0) ==
+      Row(
+        // scalastyle:off nonascii
+        """This is a test file for the text data source
+          |1+1
+          |数据砖头
+          |"doh"
+          |""".stripMargin))
+    // scalastyle:on nonascii
+    assert(data.length == 1)
+  }
+
+  test("correctness of wholetext option") {
+    import org.apache.spark.sql.catalyst.util._
+    withTempDir { dir =>
+      val file1 = new File(dir, "text1.txt")
+      stringToFile(file1,
+        """text file 1 contents.
+          |From: None to: ??
+        """.stripMargin)
+      val file2 = new File(dir, "text2.txt")
+      stringToFile(file2, "text file 2 contents.")
+      val file3 = new File(dir, "text3.txt")
+      stringToFile(file3, "text file 3 contents.")
+      val df = spark.read.option("wholetext", "true").text(dir.getAbsolutePath)
+      // Since wholetext option reads each file into a single row, df.length 
should be no. of files.
+      val data = df.sort("value").collect()
+      assert(data.length == 3)
+      // Each files should represent a single Row/element in Dataframe/Dataset
+      assert(data(0) == Row(
+        """text file 1 contents.
+          |From: None to: ??
+        """.stripMargin))
+      assert(data(1) == Row(
+        """text file 2 contents.""".stripMargin))
+      assert(data(2) == Row(
+        """text file 3 contents.""".stripMargin))
+    }
+  }
+
+
+  test("Correctness of wholetext option with gzip compression mode.") {
+    withTempDir { dir =>
+      val path = dir.getCanonicalPath
+      val df1 = spark.range(0, 1000).selectExpr("CAST(id AS STRING) AS 
s").repartition(1)
+      df1.write.option("compression", "gzip").mode("overwrite").text(path)
+      // On reading through wholetext mode, one file will be read as a single 
row, i.e. not
+      // delimited by "next line" character.
+      val expected = Row(Range(0, 1000).mkString("", "\n", "\n"))
+      Seq(10, 100, 1000).foreach { bytes =>
+        withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> bytes.toString) {
+          val df2 = spark.read.option("wholetext", 
"true").format("text").load(path)
+          val result = df2.collect().head
+          assert(result === expected)
+        }
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to