jaceklaskowski commented on code in PR #39907:
URL: https://github.com/apache/spark/pull/39907#discussion_r1149152549


##########
docs/sql-data-sources-csv.md:
##########
@@ -102,6 +102,12 @@ Data source options of CSV can be set via:
     <td>For reading, uses the first line as names of columns. For writing, 
writes the names of columns as the first line. Note that if the given path is a 
RDD of Strings, this header option will remove all lines same with the header 
if exists. CSV built-in functions ignore this option.</td>
     <td>read/write</td>
   </tr>
+  <tr>
+    <td><code>skipLines</code></td>
+    <td>0</td>
+    <td>Sets a number of non-empty, uncommented lines to skip before parsing 
each of the CSV files. If the <code>header</code> option is set to 
<code>true</code>, the first line after the number of <code>skipLines</code> 
will be taken as the header.</td>

Review Comment:
   nit: s/a number/the number + "before parsing CSV files"



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala:
##########
@@ -25,21 +25,33 @@ import org.apache.spark.sql.functions._
 
 object CSVUtils {
   /**
-   * Filter ignorable rows for CSV dataset (lines empty and starting with 
`comment`).
-   * This is currently being used in CSV schema inference.
+   * Filter blank lines, remove comments, and skip specified rows from a CSV 
iterator. Then blank
+   * entries (and comments if set) are removed. This is currently being used 
in CSV schema
+   * inference.
    */
-  def filterCommentAndEmpty(lines: Dataset[String], options: CSVOptions): 
Dataset[String] = {
+  def filterUnwantedLines(lines: Dataset[String], options: CSVOptions): 
Dataset[String] = {
     // Note that this was separately made by SPARK-18362. Logically, this 
should be the same
-    // with the one below, `filterCommentAndEmpty` but execution path is 
different. One of them
+    // with the one below, `filterUnwantedLines` but execution path is 
different. One of them
     // might have to be removed in the near future if possible.
     import lines.sqlContext.implicits._
     val aliased = lines.toDF("value")
     val nonEmptyLines = aliased.filter(length(trim($"value")) > 0)
-    if (options.isCommentSet) {
-      
nonEmptyLines.filter(!$"value".startsWith(options.comment.toString)).as[String]
-    } else {
-      nonEmptyLines.as[String]
+    val commentFilteredLines = {
+      if (options.isCommentSet) {
+        
nonEmptyLines.filter(!$"value".startsWith(options.comment.toString)).as[String]
+      } else {
+        nonEmptyLines.as[String]
+      }
     }
+    // Note that unlike actual CSV reading path, it simply filters the given 
skipped lines.
+    // Therefore, this skips the line same with the skipped lines if exists.
+    val linesToSkip = commentFilteredLines.head(options.skipLines)
+    commentFilteredLines.rdd

Review Comment:
   Is `.rdd` required here? `Dataset.mapPartitions` should give us what we 
want, shouldn't it?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala:
##########
@@ -25,21 +25,33 @@ import org.apache.spark.sql.functions._
 
 object CSVUtils {
   /**
-   * Filter ignorable rows for CSV dataset (lines empty and starting with 
`comment`).
-   * This is currently being used in CSV schema inference.
+   * Filter blank lines, remove comments, and skip specified rows from a CSV 
iterator. Then blank
+   * entries (and comments if set) are removed. This is currently being used 
in CSV schema
+   * inference.
    */
-  def filterCommentAndEmpty(lines: Dataset[String], options: CSVOptions): 
Dataset[String] = {
+  def filterUnwantedLines(lines: Dataset[String], options: CSVOptions): 
Dataset[String] = {
     // Note that this was separately made by SPARK-18362. Logically, this 
should be the same
-    // with the one below, `filterCommentAndEmpty` but execution path is 
different. One of them
+    // with the one below, `filterUnwantedLines` but execution path is 
different. One of them
     // might have to be removed in the near future if possible.
     import lines.sqlContext.implicits._
     val aliased = lines.toDF("value")
     val nonEmptyLines = aliased.filter(length(trim($"value")) > 0)
-    if (options.isCommentSet) {
-      
nonEmptyLines.filter(!$"value".startsWith(options.comment.toString)).as[String]
-    } else {
-      nonEmptyLines.as[String]
+    val commentFilteredLines = {
+      if (options.isCommentSet) {
+        
nonEmptyLines.filter(!$"value".startsWith(options.comment.toString)).as[String]
+      } else {
+        nonEmptyLines.as[String]
+      }
     }
+    // Note that unlike actual CSV reading path, it simply filters the given 
skipped lines.
+    // Therefore, this skips the line same with the skipped lines if exists.
+    val linesToSkip = commentFilteredLines.head(options.skipLines)
+    commentFilteredLines.rdd
+      .mapPartitions { iter =>
+        iter.filterNot(linesToSkip.contains(_))

Review Comment:
   `contains` should be enough. Moreover, the following should work too:
   
   ```
   .mapPartitions(_.filterNot(linesToSkip.contains))
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala:
##########
@@ -401,10 +404,17 @@ private[sql] object UnivocityParser {
       schema,
       parser.options.columnNameOfCorruptRecord)
 
+    val handleSkipLines: () => Unit =
+      () => 1.to(parser.options.skipLines).foreach(_ => tokenizer.parseNext())
     val handleHeader: () => Unit =
       () => headerChecker.checkHeaderColumnNames(tokenizer)
 
-    convertStream(inputStream, tokenizer, handleHeader, 
parser.options.charset) { tokens =>
+    convertStream(
+      inputStream,
+      tokenizer,
+      handleHeader,
+      handleSkipLines,
+      parser.options.charset) { tokens =>
       safeParser.parse(tokens)

Review Comment:
   `safeParser.parse` should be enough. If so, replace `{`s with `(`s, too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to