Github user holdenk commented on a diff in the pull request:
https://github.com/apache/spark/pull/12268#discussion_r59268351
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
---
@@ -17,153 +17,197 @@
package org.apache.spark.sql.execution.datasources.csv
-import scala.util.control.NonFatal
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.{NullWritable, Text}
-import org.apache.hadoop.mapreduce.RecordWriter
-import org.apache.hadoop.mapreduce.TaskAttemptContext
+import java.io.CharArrayWriter
+import java.nio.charset.{Charset, StandardCharsets}
+
+import com.univocity.parsers.csv.CsvWriter
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
+import org.apache.hadoop.mapred.TextInputFormat
+import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
-import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow,
UnsafeProjection}
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.datasources.{CompressionCodecs,
HadoopFileLinesReader, PartitionedFile}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
+import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.collection.BitSet
-object CSVRelation extends Logging {
-
- def univocityTokenizer(
- file: RDD[String],
- header: Seq[String],
- firstLine: String,
- params: CSVOptions): RDD[Array[String]] = {
- // If header is set, make sure firstLine is materialized before
sending to executors.
- file.mapPartitions { iter =>
- new BulkCsvReader(
- if (params.headerFlag) iter.filterNot(_ == firstLine) else iter,
- params,
- headers = header)
- }
- }
+/**
+ * Provides access to CSV data from pure SQL statements.
+ */
+class DefaultSource extends FileFormat with DataSourceRegister {
+
+ override def shortName(): String = "csv"
+
+ override def toString: String = "CSV"
+
+ override def equals(other: Any): Boolean =
other.isInstanceOf[DefaultSource]
+
+ override def inferSchema(
+ sqlContext: SQLContext,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Option[StructType] = {
+ val csvOptions = new CSVOptions(options)
- def csvParser(
- schema: StructType,
- requiredColumns: Array[String],
- params: CSVOptions): Array[String] => Option[InternalRow] = {
- val schemaFields = schema.fields
- val requiredFields = StructType(requiredColumns.map(schema(_))).fields
- val safeRequiredFields = if (params.dropMalformed) {
- // If `dropMalformed` is enabled, then it needs to parse all the
values
- // so that we can decide which row is malformed.
- requiredFields ++ schemaFields.filterNot(requiredFields.contains(_))
+ // TODO: Move filtering.
--- End diff --
Maybe unify this TODO into the same format as the one in JSON relation
which is a bit clearer (e.g. "TODO: Filter files for all formats before calling
buildInternalScan.")) assuming this is the same filtering you are referring to?
Also maybe create a JIRA (or link to existing) since TODOs comments are easy to
loose track of.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]