HyukjinKwon commented on a change in pull request #15813:
URL: https://github.com/apache/spark/pull/15813#discussion_r452740030



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
##########
@@ -173,51 +179,37 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
     }
   }
 
-  private def baseRdd(
-      sparkSession: SparkSession,
-      options: CSVOptions,
-      inputPaths: Seq[String]): RDD[String] = {
-    readText(sparkSession, options, inputPaths.mkString(","))
-  }
-
-  private def tokenRdd(
-      sparkSession: SparkSession,
-      options: CSVOptions,
-      header: Array[String],
-      inputPaths: Seq[String]): RDD[Array[String]] = {
-    val rdd = baseRdd(sparkSession, options, inputPaths)
-    // Make sure firstLine is materialized before sending to executors
-    val firstLine = if (options.headerFlag) findFirstLine(options, rdd) else 
null
-    CSVRelation.univocityTokenizer(rdd, firstLine, options)
-  }
-
   /**
    * Returns the first line of the first non-empty file in path
    */
-  private def findFirstLine(options: CSVOptions, rdd: RDD[String]): String = {
+  private def findFirstLine(options: CSVOptions, lines: Dataset[String]): 
String = {
+    import lines.sqlContext.implicits._
+    val nonEmptyLines = lines.filter(length(trim($"value")) > 0)
     if (options.isCommentSet) {
-      val comment = options.comment.toString
-      rdd.filter { line =>
-        line.trim.nonEmpty && !line.startsWith(comment)
-      }.first()
+      
nonEmptyLines.filter(!$"value".startsWith(options.comment.toString)).first()
     } else {
-      rdd.filter { line =>
-        line.trim.nonEmpty
-      }.first()
+      nonEmptyLines.first()
     }
   }
 
   private def readText(
       sparkSession: SparkSession,
       options: CSVOptions,
-      location: String): RDD[String] = {
+      inputPaths: Seq[String]): Dataset[String] = {
     if (Charset.forName(options.charset) == StandardCharsets.UTF_8) {
-      sparkSession.sparkContext.textFile(location)
+      sparkSession.baseRelationToDataFrame(
+        DataSource.apply(
+          sparkSession,
+          paths = inputPaths,
+          className = classOf[TextFileFormat].getName
+        ).resolveRelation(checkFilesExist = false))
+        .select("value").as[String](Encoders.STRING)
     } else {
       val charset = options.charset
-      sparkSession.sparkContext
-        .hadoopFile[LongWritable, Text, TextInputFormat](location)
+      val rdd = sparkSession.sparkContext

Review comment:
       @rxin, I made a PR to address it at 
https://github.com/apache/spark/pull/29063 FYI.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to