ted-jenks commented on code in PR #39927:
URL: https://github.com/apache/spark/pull/39927#discussion_r1098497933
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala:
##########
@@ -29,17 +29,23 @@ object CSVUtils {
* This is currently being used in CSV schema inference.
*/
def filterCommentAndEmpty(lines: Dataset[String], options: CSVOptions):
Dataset[String] = {
- // Note that this was separately made by SPARK-18362. Logically, this
should be the same
- // with the one below, `filterCommentAndEmpty` but execution path is
different. One of them
- // might have to be removed in the near future if possible.
+ val nonEmptyLines = filterEmpty(lines)
+ filterComment(nonEmptyLines, options)
+ }
+
+ def filterComment(lines: Dataset[String], options: CSVOptions):
Dataset[String] = {
+ import lines.sqlContext.implicits._
+ lines.rdd.mapPartitions(
+ CSVExprUtils.filterComment(_, options)).toDS()
+ }
+
+ /**
+ * Filter empty lines from dataset. This is currently being used in CSV
schema inference.
+ */
+ private def filterEmpty(lines: Dataset[String]): Dataset[String] = {
Review Comment:
As noted by @HyukjinKwon , blank record skipping is used for
`DataFrameReader.csv(dataset: Dataset[String])`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]