Repository: spark Updated Branches: refs/heads/master cfcd74668 -> ffe6fd77a
[SPARK-22818][SQL] csv escape of quote escape ## What changes were proposed in this pull request? Escape of escape should be considered when using the UniVocity csv encoding/decoding library. Ref: https://github.com/uniVocity/univocity-parsers#escaping-quote-escape-characters One option is added for reading and writing CSV: `escapeQuoteEscaping` ## How was this patch tested? Unit test added. Author: soonmok-kwon <soonmok.k...@navercorp.com> Closes #20004 from ep1804/SPARK-22818. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ffe6fd77 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ffe6fd77 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ffe6fd77 Branch: refs/heads/master Commit: ffe6fd77a42043db8ebaf43d98059dcd28a53f1e Parents: cfcd746 Author: soonmok-kwon <soonmok.k...@navercorp.com> Authored: Fri Dec 29 07:30:06 2017 +0800 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Fri Dec 29 07:30:06 2017 +0800 ---------------------------------------------------------------------- python/pyspark/sql/readwriter.py | 33 +++++++++++++------- python/pyspark/sql/streaming.py | 17 ++++++---- .../org/apache/spark/sql/DataFrameReader.scala | 11 ++++--- .../org/apache/spark/sql/DataFrameWriter.scala | 9 ++++-- .../execution/datasources/csv/CSVOptions.scala | 10 ++++++ .../spark/sql/streaming/DataStreamReader.scala | 11 ++++--- .../execution/datasources/csv/CSVSuite.scala | 31 ++++++++++++++++++ 7 files changed, 94 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ffe6fd77/python/pyspark/sql/readwriter.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 4e58bfb..49af1bc 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -333,7 +333,7 @@ class DataFrameReader(OptionUtils): ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, - columnNameOfCorruptRecord=None, multiLine=None): + columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None): """Loads a CSV file and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -344,17 +344,17 @@ class DataFrameReader(OptionUtils): or RDD of Strings storing CSV rows. :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``). - :param sep: sets the single character as a separator for each field and value. + :param sep: sets a single character as a separator for each field and value. If None is set, it uses the default value, ``,``. :param encoding: decodes the CSV files by the given encoding type. If None is set, it uses the default value, ``UTF-8``. - :param quote: sets the single character used for escaping quoted values where the + :param quote: sets a single character used for escaping quoted values where the separator can be part of the value. If None is set, it uses the default value, ``"``. If you would like to turn off quotations, you need to set an empty string. - :param escape: sets the single character used for escaping quotes inside an already + :param escape: sets a single character used for escaping quotes inside an already quoted value. If None is set, it uses the default value, ``\``. - :param comment: sets the single character used for skipping lines beginning with this + :param comment: sets a single character used for skipping lines beginning with this character. By default (None), it is disabled. :param header: uses the first line as names of columns. If None is set, it uses the default value, ``false``. @@ -410,6 +410,10 @@ class DataFrameReader(OptionUtils): ``spark.sql.columnNameOfCorruptRecord``. :param multiLine: parse records, which may span multiple lines. If None is set, it uses the default value, ``false``. + :param charToEscapeQuoteEscaping: sets a single character used for escaping the escape for + the quote character. If None is set, the default value is + escape character when escape and quote characters are + different, ``\0`` otherwise. >>> df = spark.read.csv('python/test_support/sql/ages.csv') >>> df.dtypes @@ -427,7 +431,8 @@ class DataFrameReader(OptionUtils): dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, - columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine) + columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine, + charToEscapeQuoteEscaping=charToEscapeQuoteEscaping) if isinstance(path, basestring): path = [path] if type(path) == list: @@ -814,7 +819,8 @@ class DataFrameWriter(OptionUtils): @since(2.0) def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, - timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None): + timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, + charToEscapeQuoteEscaping=None): """Saves the content of the :class:`DataFrame` in CSV format at the specified path. :param path: the path in any Hadoop supported file system @@ -829,12 +835,12 @@ class DataFrameWriter(OptionUtils): :param compression: compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, bzip2, gzip, lz4, snappy and deflate). - :param sep: sets the single character as a separator for each field and value. If None is + :param sep: sets a single character as a separator for each field and value. If None is set, it uses the default value, ``,``. - :param quote: sets the single character used for escaping quoted values where the + :param quote: sets a single character used for escaping quoted values where the separator can be part of the value. If None is set, it uses the default value, ``"``. If an empty string is set, it uses ``u0000`` (null character). - :param escape: sets the single character used for escaping quotes inside an already + :param escape: sets a single character used for escaping quotes inside an already quoted value. If None is set, it uses the default value, ``\`` :param escapeQuotes: a flag indicating whether values containing quotes should always be enclosed in quotes. If None is set, it uses the default value @@ -860,6 +866,10 @@ class DataFrameWriter(OptionUtils): :param ignoreTrailingWhiteSpace: a flag indicating whether or not trailing whitespaces from values being written should be skipped. If None is set, it uses the default value, ``true``. + :param charToEscapeQuoteEscaping: sets a single character used for escaping the escape for + the quote character. If None is set, the default value is + escape character when escape and quote characters are + different, ``\0`` otherwise.. >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) """ @@ -868,7 +878,8 @@ class DataFrameWriter(OptionUtils): nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll, dateFormat=dateFormat, timestampFormat=timestampFormat, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, - ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace) + ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, + charToEscapeQuoteEscaping=charToEscapeQuoteEscaping) self._jwrite.csv(path) @since(1.5) http://git-wip-us.apache.org/repos/asf/spark/blob/ffe6fd77/python/pyspark/sql/streaming.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index d0aba28..fb228f9 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -560,7 +560,7 @@ class DataStreamReader(OptionUtils): ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, - columnNameOfCorruptRecord=None, multiLine=None): + columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None): """Loads a CSV file stream and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -572,17 +572,17 @@ class DataStreamReader(OptionUtils): :param path: string, or list of strings, for input path(s). :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``). - :param sep: sets the single character as a separator for each field and value. + :param sep: sets a single character as a separator for each field and value. If None is set, it uses the default value, ``,``. :param encoding: decodes the CSV files by the given encoding type. If None is set, it uses the default value, ``UTF-8``. - :param quote: sets the single character used for escaping quoted values where the + :param quote: sets a single character used for escaping quoted values where the separator can be part of the value. If None is set, it uses the default value, ``"``. If you would like to turn off quotations, you need to set an empty string. - :param escape: sets the single character used for escaping quotes inside an already + :param escape: sets a single character used for escaping quotes inside an already quoted value. If None is set, it uses the default value, ``\``. - :param comment: sets the single character used for skipping lines beginning with this + :param comment: sets a single character used for skipping lines beginning with this character. By default (None), it is disabled. :param header: uses the first line as names of columns. If None is set, it uses the default value, ``false``. @@ -638,6 +638,10 @@ class DataStreamReader(OptionUtils): ``spark.sql.columnNameOfCorruptRecord``. :param multiLine: parse one record, which may span multiple lines. If None is set, it uses the default value, ``false``. + :param charToEscapeQuoteEscaping: sets a single character used for escaping the escape for + the quote character. If None is set, the default value is + escape character when escape and quote characters are + different, ``\0`` otherwise.. >>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema) >>> csv_sdf.isStreaming @@ -653,7 +657,8 @@ class DataStreamReader(OptionUtils): dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, - columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine) + columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine, + charToEscapeQuoteEscaping=charToEscapeQuoteEscaping) if isinstance(path, basestring): return self._df(self._jreader.csv(path)) else: http://git-wip-us.apache.org/repos/asf/spark/blob/ffe6fd77/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index c43ee91..e8d683a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -517,17 +517,20 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * * You can set the following CSV-specific options to deal with CSV files: * <ul> - * <li>`sep` (default `,`): sets the single character as a separator for each + * <li>`sep` (default `,`): sets a single character as a separator for each * field and value.</li> * <li>`encoding` (default `UTF-8`): decodes the CSV files by the given encoding * type.</li> - * <li>`quote` (default `"`): sets the single character used for escaping quoted values where + * <li>`quote` (default `"`): sets a single character used for escaping quoted values where * the separator can be part of the value. If you would like to turn off quotations, you need to * set not `null` but an empty string. This behaviour is different from * `com.databricks.spark.csv`.</li> - * <li>`escape` (default `\`): sets the single character used for escaping quotes inside + * <li>`escape` (default `\`): sets a single character used for escaping quotes inside * an already quoted value.</li> - * <li>`comment` (default empty string): sets the single character used for skipping lines + * <li>`charToEscapeQuoteEscaping` (default `escape` or `\0`): sets a single character used for + * escaping the escape for the quote character. The default value is escape character when escape + * and quote characters are different, `\0` otherwise.</li> + * <li>`comment` (default empty string): sets a single character used for skipping lines * beginning with this character. By default, it is disabled.</li> * <li>`header` (default `false`): uses the first line as names of columns.</li> * <li>`inferSchema` (default `false`): infers the input schema automatically from data. It http://git-wip-us.apache.org/repos/asf/spark/blob/ffe6fd77/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 7ccda0a..bd216ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -594,13 +594,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * * You can set the following CSV-specific option(s) for writing CSV files: * <ul> - * <li>`sep` (default `,`): sets the single character as a separator for each + * <li>`sep` (default `,`): sets a single character as a separator for each * field and value.</li> - * <li>`quote` (default `"`): sets the single character used for escaping quoted values where + * <li>`quote` (default `"`): sets a single character used for escaping quoted values where * the separator can be part of the value. If an empty string is set, it uses `u0000` * (null character).</li> - * <li>`escape` (default `\`): sets the single character used for escaping quotes inside + * <li>`escape` (default `\`): sets a single character used for escaping quotes inside * an already quoted value.</li> + * <li>`charToEscapeQuoteEscaping` (default `escape` or `\0`): sets a single character used for + * escaping the escape for the quote character. The default value is escape character when escape + * and quote characters are different, `\0` otherwise.</li> * <li>`escapeQuotes` (default `true`): a flag indicating whether values containing * quotes should always be enclosed in quotes. Default is to escape all values containing * a quote character.</li> http://git-wip-us.apache.org/repos/asf/spark/blob/ffe6fd77/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index a13a5a3..c167906 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -89,6 +89,14 @@ class CSVOptions( val quote = getChar("quote", '\"') val escape = getChar("escape", '\\') + val charToEscapeQuoteEscaping = parameters.get("charToEscapeQuoteEscaping") match { + case None => None + case Some(null) => None + case Some(value) if value.length == 0 => None + case Some(value) if value.length == 1 => Some(value.charAt(0)) + case _ => + throw new RuntimeException("charToEscapeQuoteEscaping cannot be more than one character") + } val comment = getChar("comment", '\u0000') val headerFlag = getBool("header") @@ -148,6 +156,7 @@ class CSVOptions( format.setDelimiter(delimiter) format.setQuote(quote) format.setQuoteEscape(escape) + charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping) format.setComment(comment) writerSettings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlagInWrite) writerSettings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceFlagInWrite) @@ -165,6 +174,7 @@ class CSVOptions( format.setDelimiter(delimiter) format.setQuote(quote) format.setQuoteEscape(escape) + charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping) format.setComment(comment) settings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceInRead) settings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceInRead) http://git-wip-us.apache.org/repos/asf/spark/blob/ffe6fd77/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index acd5ca1..2e92bee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -262,17 +262,20 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * <ul> * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be * considered in every trigger.</li> - * <li>`sep` (default `,`): sets the single character as a separator for each + * <li>`sep` (default `,`): sets a single character as a separator for each * field and value.</li> * <li>`encoding` (default `UTF-8`): decodes the CSV files by the given encoding * type.</li> - * <li>`quote` (default `"`): sets the single character used for escaping quoted values where + * <li>`quote` (default `"`): sets a single character used for escaping quoted values where * the separator can be part of the value. If you would like to turn off quotations, you need to * set not `null` but an empty string. This behaviour is different form * `com.databricks.spark.csv`.</li> - * <li>`escape` (default `\`): sets the single character used for escaping quotes inside + * <li>`escape` (default `\`): sets a single character used for escaping quotes inside * an already quoted value.</li> - * <li>`comment` (default empty string): sets the single character used for skipping lines + * <li>`charToEscapeQuoteEscaping` (default `escape` or `\0`): sets a single character used for + * escaping the escape for the quote character. The default value is escape character when escape + * and quote characters are different, `\0` otherwise.</li> + * <li>`comment` (default empty string): sets a single character used for skipping lines * beginning with this character. By default, it is disabled.</li> * <li>`header` (default `false`): uses the first line as names of columns.</li> * <li>`inferSchema` (default `false`): infers the input schema automatically from data. It http://git-wip-us.apache.org/repos/asf/spark/blob/ffe6fd77/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 4fe4542..4398e54 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -482,6 +482,37 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } + test("save csv with quote escaping, using charToEscapeQuoteEscaping option") { + withTempPath { path => + + // original text + val df1 = Seq( + """You are "beautiful"""", + """Yes, \"in the inside"\""" + ).toDF() + + // text written in CSV with following options: + // quote character: " + // escape character: \ + // character to escape quote escaping: # + val df2 = Seq( + """"You are \"beautiful\""""", + """"Yes, #\\"in the inside\"#\"""" + ).toDF() + + df2.coalesce(1).write.text(path.getAbsolutePath) + + val df3 = spark.read + .format("csv") + .option("quote", "\"") + .option("escape", "\\") + .option("charToEscapeQuoteEscaping", "#") + .load(path.getAbsolutePath) + + checkAnswer(df1, df3) + } + } + test("commented lines in CSV data") { Seq("false", "true").foreach { multiLine => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org