spark git commit: [SPARK-26198][SQL] Fix Metadata serialize null values throw NPE
Repository: spark Updated Branches: refs/heads/branch-2.3 0058986de -> 8236f6497 [SPARK-26198][SQL] Fix Metadata serialize null values throw NPE How to reproduce this issue: ```scala scala> val meta = new org.apache.spark.sql.types.MetadataBuilder().putNull("key").build().json java.lang.NullPointerException at org.apache.spark.sql.types.Metadata$.org$apache$spark$sql$types$Metadata$$toJsonValue(Metadata.scala:196) at org.apache.spark.sql.types.Metadata$$anonfun$1.apply(Metadata.scala:180) ``` This pr fix `NullPointerException` when `Metadata` serialize `null` values. unit tests Closes #23164 from wangyum/SPARK-26198. Authored-by: Yuming Wang Signed-off-by: Sean Owen (cherry picked from commit 676bbb2446af1f281b8f76a5428b7ba75b7588b3) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8236f649 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8236f649 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8236f649 Branch: refs/heads/branch-2.3 Commit: 8236f649762fd953626e7d14eb5b5dd544ab38cc Parents: 0058986 Author: Yuming Wang Authored: Sun Dec 2 08:52:01 2018 -0600 Committer: Sean Owen Committed: Sun Dec 2 23:44:00 2018 -0600 -- .../src/main/scala/org/apache/spark/sql/types/Metadata.scala | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8236f649/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala index 352fb54..bcdc863 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala @@ -190,6 +190,8 @@ object Metadata { JBool(x) case x: String => JString(x) + case null => +JNull case x: Metadata => toJsonValue(x.map) case other => - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-26198][SQL] Fix Metadata serialize null values throw NPE
Repository: spark Updated Branches: refs/heads/branch-2.4 58a4c0ce7 -> 91b86b7f3 [SPARK-26198][SQL] Fix Metadata serialize null values throw NPE ## What changes were proposed in this pull request? How to reproduce this issue: ```scala scala> val meta = new org.apache.spark.sql.types.MetadataBuilder().putNull("key").build().json java.lang.NullPointerException at org.apache.spark.sql.types.Metadata$.org$apache$spark$sql$types$Metadata$$toJsonValue(Metadata.scala:196) at org.apache.spark.sql.types.Metadata$$anonfun$1.apply(Metadata.scala:180) ``` This pr fix `NullPointerException` when `Metadata` serialize `null` values. ## How was this patch tested? unit tests Closes #23164 from wangyum/SPARK-26198. Authored-by: Yuming Wang Signed-off-by: Sean Owen (cherry picked from commit 676bbb2446af1f281b8f76a5428b7ba75b7588b3) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/91b86b7f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/91b86b7f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/91b86b7f Branch: refs/heads/branch-2.4 Commit: 91b86b7f3fdd87c2c95603a53df5cc9f373c681c Parents: 58a4c0c Author: Yuming Wang Authored: Sun Dec 2 08:52:01 2018 -0600 Committer: Sean Owen Committed: Sun Dec 2 23:40:54 2018 -0600 -- .../src/main/scala/org/apache/spark/sql/types/Metadata.scala| 2 ++ .../test/scala/org/apache/spark/sql/types/MetadataSuite.scala | 5 + 2 files changed, 7 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/91b86b7f/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala index 7c15dc0..e79ab7a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala @@ -190,6 +190,8 @@ object Metadata { JBool(x) case x: String => JString(x) + case null => +JNull case x: Metadata => toJsonValue(x.map) case other => http://git-wip-us.apache.org/repos/asf/spark/blob/91b86b7f/sql/catalyst/src/test/scala/org/apache/spark/sql/types/MetadataSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/MetadataSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/MetadataSuite.scala index 210e657..b4aeac5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/MetadataSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/MetadataSuite.scala @@ -26,6 +26,7 @@ class MetadataSuite extends SparkFunSuite { assert(meta.## !== 0) assert(meta.getString("key") === "value") assert(meta.contains("key")) +assert(meta === Metadata.fromJson(meta.json)) intercept[NoSuchElementException](meta.getString("no_such_key")) intercept[ClassCastException](meta.getBoolean("key")) } @@ -36,6 +37,7 @@ class MetadataSuite extends SparkFunSuite { assert(meta.## !== 0) assert(meta.getLong("key") === 12) assert(meta.contains("key")) +assert(meta === Metadata.fromJson(meta.json)) intercept[NoSuchElementException](meta.getLong("no_such_key")) intercept[ClassCastException](meta.getBoolean("key")) } @@ -46,6 +48,7 @@ class MetadataSuite extends SparkFunSuite { assert(meta.## !== 0) assert(meta.getDouble("key") === 12) assert(meta.contains("key")) +assert(meta === Metadata.fromJson(meta.json)) intercept[NoSuchElementException](meta.getDouble("no_such_key")) intercept[ClassCastException](meta.getBoolean("key")) } @@ -56,6 +59,7 @@ class MetadataSuite extends SparkFunSuite { assert(meta.## !== 0) assert(meta.getBoolean("key") === true) assert(meta.contains("key")) +assert(meta === Metadata.fromJson(meta.json)) intercept[NoSuchElementException](meta.getBoolean("no_such_key")) intercept[ClassCastException](meta.getString("key")) } @@ -69,6 +73,7 @@ class MetadataSuite extends SparkFunSuite { assert(meta.getLong("key") === 0) assert(meta.getBoolean("key") === false) assert(meta.contains("key")) +assert(meta === Metadata.fromJson(meta.json)) intercept[NoSuchElementException](meta.getLong("no_such_key")) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v2.4.0 [created] 075447b39 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v2.4.0 [deleted] 075447b39 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r31309 - in /dev/spark/3.0.0-SNAPSHOT-2018_12_02_10_08-676bbb2-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Sun Dec 2 18:20:48 2018 New Revision: 31309 Log: Apache Spark 3.0.0-SNAPSHOT-2018_12_02_10_08-676bbb2 docs [This commit notification would consist of 1764 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-26198][SQL] Fix Metadata serialize null values throw NPE
Repository: spark Updated Branches: refs/heads/master 9cda9a892 -> 676bbb244 [SPARK-26198][SQL] Fix Metadata serialize null values throw NPE ## What changes were proposed in this pull request? How to reproduce this issue: ```scala scala> val meta = new org.apache.spark.sql.types.MetadataBuilder().putNull("key").build().json java.lang.NullPointerException at org.apache.spark.sql.types.Metadata$.org$apache$spark$sql$types$Metadata$$toJsonValue(Metadata.scala:196) at org.apache.spark.sql.types.Metadata$$anonfun$1.apply(Metadata.scala:180) ``` This pr fix `NullPointerException` when `Metadata` serialize `null` values. ## How was this patch tested? unit tests Closes #23164 from wangyum/SPARK-26198. Authored-by: Yuming Wang Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/676bbb24 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/676bbb24 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/676bbb24 Branch: refs/heads/master Commit: 676bbb2446af1f281b8f76a5428b7ba75b7588b3 Parents: 9cda9a8 Author: Yuming Wang Authored: Sun Dec 2 08:52:01 2018 -0600 Committer: Sean Owen Committed: Sun Dec 2 08:52:01 2018 -0600 -- .../src/main/scala/org/apache/spark/sql/types/Metadata.scala| 2 ++ .../test/scala/org/apache/spark/sql/types/MetadataSuite.scala | 5 + 2 files changed, 7 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/676bbb24/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala index 4979ace..b6a859b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala @@ -190,6 +190,8 @@ object Metadata { JBool(x) case x: String => JString(x) + case null => +JNull case x: Metadata => toJsonValue(x.map) case other => http://git-wip-us.apache.org/repos/asf/spark/blob/676bbb24/sql/catalyst/src/test/scala/org/apache/spark/sql/types/MetadataSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/MetadataSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/MetadataSuite.scala index 210e657..b4aeac5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/MetadataSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/MetadataSuite.scala @@ -26,6 +26,7 @@ class MetadataSuite extends SparkFunSuite { assert(meta.## !== 0) assert(meta.getString("key") === "value") assert(meta.contains("key")) +assert(meta === Metadata.fromJson(meta.json)) intercept[NoSuchElementException](meta.getString("no_such_key")) intercept[ClassCastException](meta.getBoolean("key")) } @@ -36,6 +37,7 @@ class MetadataSuite extends SparkFunSuite { assert(meta.## !== 0) assert(meta.getLong("key") === 12) assert(meta.contains("key")) +assert(meta === Metadata.fromJson(meta.json)) intercept[NoSuchElementException](meta.getLong("no_such_key")) intercept[ClassCastException](meta.getBoolean("key")) } @@ -46,6 +48,7 @@ class MetadataSuite extends SparkFunSuite { assert(meta.## !== 0) assert(meta.getDouble("key") === 12) assert(meta.contains("key")) +assert(meta === Metadata.fromJson(meta.json)) intercept[NoSuchElementException](meta.getDouble("no_such_key")) intercept[ClassCastException](meta.getBoolean("key")) } @@ -56,6 +59,7 @@ class MetadataSuite extends SparkFunSuite { assert(meta.## !== 0) assert(meta.getBoolean("key") === true) assert(meta.contains("key")) +assert(meta === Metadata.fromJson(meta.json)) intercept[NoSuchElementException](meta.getBoolean("no_such_key")) intercept[ClassCastException](meta.getString("key")) } @@ -69,6 +73,7 @@ class MetadataSuite extends SparkFunSuite { assert(meta.getLong("key") === 0) assert(meta.getBoolean("key") === false) assert(meta.contains("key")) +assert(meta === Metadata.fromJson(meta.json)) intercept[NoSuchElementException](meta.getLong("no_such_key")) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r31299 - in /dev/spark/2.4.1-SNAPSHOT-2018_12_02_04_05-58a4c0c-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Sun Dec 2 12:20:12 2018 New Revision: 31299 Log: Apache Spark 2.4.1-SNAPSHOT-2018_12_02_04_05-58a4c0c docs [This commit notification would consist of 1476 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r31298 - in /dev/spark/3.0.0-SNAPSHOT-2018_12_02_02_05-9cda9a8-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Sun Dec 2 10:17:26 2018 New Revision: 31298 Log: Apache Spark 3.0.0-SNAPSHOT-2018_12_02_02_05-9cda9a8 docs [This commit notification would consist of 1764 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-26080][PYTHON] Skips Python resource limit on Windows in Python worker
Repository: spark Updated Branches: refs/heads/branch-2.4 3ec03ecf2 -> 58a4c0ce7 [SPARK-26080][PYTHON] Skips Python resource limit on Windows in Python worker ## What changes were proposed in this pull request? `resource` package is a Unix specific package. See https://docs.python.org/2/library/resource.html and https://docs.python.org/3/library/resource.html. Note that we document Windows support: > Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). This should be backported into branch-2.4 to restore Windows support in Spark 2.4.1. ## How was this patch tested? Manually mocking the changed logics. Closes #23055 from HyukjinKwon/SPARK-26080. Lead-authored-by: hyukjinkwon Co-authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon (cherry picked from commit 9cda9a892d03f60a76cd5d9b4546e72c50962c85) Signed-off-by: Hyukjin Kwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/58a4c0ce Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/58a4c0ce Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/58a4c0ce Branch: refs/heads/branch-2.4 Commit: 58a4c0ce7530577561be3c5106628dcad06eee18 Parents: 3ec03ec Author: hyukjinkwon Authored: Sun Dec 2 17:41:08 2018 +0800 Committer: Hyukjin Kwon Committed: Sun Dec 2 17:41:25 2018 +0800 -- docs/configuration.md| 2 ++ python/pyspark/worker.py | 19 --- 2 files changed, 14 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/58a4c0ce/docs/configuration.md -- diff --git a/docs/configuration.md b/docs/configuration.md index 944e5e4..042e57d 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -190,6 +190,8 @@ of the most common options to set are: and it is up to the application to avoid exceeding the overhead memory space shared with other non-JVM processes. When PySpark is run in YARN or Kubernetes, this memory is added to executor resource requests. + +NOTE: Python memory usage may not be limited on platforms that do not support resource limiting, such as Windows. http://git-wip-us.apache.org/repos/asf/spark/blob/58a4c0ce/python/pyspark/worker.py -- diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 8c59f1f..953b468 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -22,7 +22,12 @@ from __future__ import print_function import os import sys import time -import resource +# 'resource' is a Unix specific module. +has_resource_module = True +try: +import resource +except ImportError: +has_resource_module = False import socket import traceback @@ -268,9 +273,9 @@ def main(infile, outfile): # set up memory limits memory_limit_mb = int(os.environ.get('PYSPARK_EXECUTOR_MEMORY_MB', "-1")) -total_memory = resource.RLIMIT_AS -try: -if memory_limit_mb > 0: +if memory_limit_mb > 0 and has_resource_module: +total_memory = resource.RLIMIT_AS +try: (soft_limit, hard_limit) = resource.getrlimit(total_memory) msg = "Current mem limits: {0} of max {1}\n".format(soft_limit, hard_limit) print(msg, file=sys.stderr) @@ -283,9 +288,9 @@ def main(infile, outfile): print(msg, file=sys.stderr) resource.setrlimit(total_memory, (new_limit, new_limit)) -except (resource.error, OSError, ValueError) as e: -# not all systems support resource limits, so warn instead of failing -print("WARN: Failed to set memory limit: {0}\n".format(e), file=sys.stderr) +except (resource.error, OSError, ValueError) as e: +# not all systems support resource limits, so warn instead of failing +print("WARN: Failed to set memory limit: {0}\n".format(e), file=sys.stderr) # initialize global state taskContext = None - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-26080][PYTHON] Skips Python resource limit on Windows in Python worker
Repository: spark Updated Branches: refs/heads/master c7d95cced -> 9cda9a892 [SPARK-26080][PYTHON] Skips Python resource limit on Windows in Python worker ## What changes were proposed in this pull request? `resource` package is a Unix specific package. See https://docs.python.org/2/library/resource.html and https://docs.python.org/3/library/resource.html. Note that we document Windows support: > Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). This should be backported into branch-2.4 to restore Windows support in Spark 2.4.1. ## How was this patch tested? Manually mocking the changed logics. Closes #23055 from HyukjinKwon/SPARK-26080. Lead-authored-by: hyukjinkwon Co-authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9cda9a89 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9cda9a89 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9cda9a89 Branch: refs/heads/master Commit: 9cda9a892d03f60a76cd5d9b4546e72c50962c85 Parents: c7d95cc Author: hyukjinkwon Authored: Sun Dec 2 17:41:08 2018 +0800 Committer: Hyukjin Kwon Committed: Sun Dec 2 17:41:08 2018 +0800 -- docs/configuration.md| 2 ++ python/pyspark/worker.py | 19 --- 2 files changed, 14 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9cda9a89/docs/configuration.md -- diff --git a/docs/configuration.md b/docs/configuration.md index 8914bd0..9abbb3f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -190,6 +190,8 @@ of the most common options to set are: and it is up to the application to avoid exceeding the overhead memory space shared with other non-JVM processes. When PySpark is run in YARN or Kubernetes, this memory is added to executor resource requests. + +NOTE: Python memory usage may not be limited on platforms that do not support resource limiting, such as Windows. http://git-wip-us.apache.org/repos/asf/spark/blob/9cda9a89/python/pyspark/worker.py -- diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 8c59f1f..953b468 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -22,7 +22,12 @@ from __future__ import print_function import os import sys import time -import resource +# 'resource' is a Unix specific module. +has_resource_module = True +try: +import resource +except ImportError: +has_resource_module = False import socket import traceback @@ -268,9 +273,9 @@ def main(infile, outfile): # set up memory limits memory_limit_mb = int(os.environ.get('PYSPARK_EXECUTOR_MEMORY_MB', "-1")) -total_memory = resource.RLIMIT_AS -try: -if memory_limit_mb > 0: +if memory_limit_mb > 0 and has_resource_module: +total_memory = resource.RLIMIT_AS +try: (soft_limit, hard_limit) = resource.getrlimit(total_memory) msg = "Current mem limits: {0} of max {1}\n".format(soft_limit, hard_limit) print(msg, file=sys.stderr) @@ -283,9 +288,9 @@ def main(infile, outfile): print(msg, file=sys.stderr) resource.setrlimit(total_memory, (new_limit, new_limit)) -except (resource.error, OSError, ValueError) as e: -# not all systems support resource limits, so warn instead of failing -print("WARN: Failed to set memory limit: {0}\n".format(e), file=sys.stderr) +except (resource.error, OSError, ValueError) as e: +# not all systems support resource limits, so warn instead of failing +print("WARN: Failed to set memory limit: {0}\n".format(e), file=sys.stderr) # initialize global state taskContext = None - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-26208][SQL] add headers to empty csv files when header=true
Repository: spark Updated Branches: refs/heads/master 031bd80e4 -> c7d95cced [SPARK-26208][SQL] add headers to empty csv files when header=true ## What changes were proposed in this pull request? Add headers to empty csv files when header=true, because otherwise these files are invalid when reading. ## How was this patch tested? Added test for roundtrip of empty dataframe to csv file with headers and back in CSVSuite Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #23173 from koertkuipers/feat-empty-csv-with-header. Authored-by: Koert Kuipers Signed-off-by: Hyukjin Kwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c7d95cce Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c7d95cce Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c7d95cce Branch: refs/heads/master Commit: c7d95ccedf593edf9fda9ecaf8d0b4dda451440d Parents: 031bd80 Author: Koert Kuipers Authored: Sun Dec 2 17:38:25 2018 +0800 Committer: Hyukjin Kwon Committed: Sun Dec 2 17:38:25 2018 +0800 -- .../sql/catalyst/csv/UnivocityGenerator.scala | 9 .../datasources/csv/CSVFileFormat.scala | 22 +--- .../execution/datasources/csv/CSVSuite.scala| 13 3 files changed, 31 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c7d95cce/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala index 1218f92..2ab376c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -32,7 +32,6 @@ class UnivocityGenerator( private val writerSettings = options.asWriterSettings writerSettings.setHeaders(schema.fieldNames: _*) private val gen = new CsvWriter(writer, writerSettings) - private var printHeader = options.headerFlag // A `ValueConverter` is responsible for converting a value of an `InternalRow` to `String`. // When the value is null, this converter should not be called. @@ -72,15 +71,15 @@ class UnivocityGenerator( values } + def writeHeaders(): Unit = { +gen.writeHeaders() + } + /** * Writes a single InternalRow to CSV using Univocity. */ def write(row: InternalRow): Unit = { -if (printHeader) { - gen.writeHeaders() -} gen.writeRow(convertRow(row): _*) -printHeader = false } def writeToString(row: InternalRow): String = { http://git-wip-us.apache.org/repos/asf/spark/blob/c7d95cce/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 4c5a1d3..f7d8a9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -171,15 +171,21 @@ private[csv] class CsvOutputWriter( private var univocityGenerator: Option[UnivocityGenerator] = None - override def write(row: InternalRow): Unit = { -val gen = univocityGenerator.getOrElse { - val charset = Charset.forName(params.charset) - val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset) - val newGen = new UnivocityGenerator(dataSchema, os, params) - univocityGenerator = Some(newGen) - newGen -} + if (params.headerFlag) { +val gen = getGen() +gen.writeHeaders() + } + private def getGen(): UnivocityGenerator = univocityGenerator.getOrElse { +val charset = Charset.forName(params.charset) +val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset) +val newGen = new UnivocityGenerator(dataSchema, os, params) +univocityGenerator = Some(newGen) +newGen + } + + override def write(row: InternalRow): Unit = { +val gen = getGen() gen.write(row) } http://git-wip-us.apache.org/repos/asf/spark/blob/c7d95cce/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/t