Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/16928#discussion_r102636720
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
---
@@ -45,24 +45,41 @@ private[csv] class UnivocityParser(
// A `ValueConverter` is responsible for converting the given value to a
desired type.
private type ValueConverter = String => Any
+ private val corruptFieldIndex =
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+ corruptFieldIndex.foreach { corrFieldIndex =>
+ require(schema(corrFieldIndex).dataType == StringType)
+ require(schema(corrFieldIndex).nullable)
+ }
+
+ private val inputSchema = StructType(schema.filter(_.name !=
options.columnNameOfCorruptRecord))
+
private val valueConverters =
- schema.map(f => makeConverter(f.name, f.dataType, f.nullable,
options)).toArray
+ inputSchema.map(f => makeConverter(f.name, f.dataType, f.nullable,
options)).toArray
private val parser = new CsvParser(options.asParserSettings)
private var numMalformedRecords = 0
private val row = new GenericInternalRow(requiredSchema.length)
- private val indexArr: Array[Int] = {
+ // This parser loads an `indexArr._1`-th position value in input tokens,
+ // then put the value in `row(indexArr._2)`.
+ private val indexArr: Array[(Int, Int)] = {
val fields = if (options.dropMalformed) {
// If `dropMalformed` is enabled, then it needs to parse all the
values
// so that we can decide which row is malformed.
requiredSchema ++ schema.filterNot(requiredSchema.contains(_))
} else {
requiredSchema
}
- fields.map(schema.indexOf(_: StructField)).toArray
+ val fieldsWithIndexes = fields.zipWithIndex
+ corruptFieldIndex.map { case corrFieldIndex =>
+ fieldsWithIndexes.filter { case (_, i) => i != corrFieldIndex }
+ }.getOrElse {
+ fieldsWithIndexes
+ }.map { case (f, i) =>
+ (inputSchema.indexOf(f), i)
+ }.toArray
--- End diff --
@maropu, I understand it is nice to avoid per-record computation. However,
I feel I really want to clean this part up (as I had a hard time to clean this
path..). I am not confident enough for the current status here but I don't have
a better idea to deal only with this in this way.
How about putting this into `convertWithParseMode`? I tried a rough change
for the suggestion to refer above -
https://github.com/apache/spark/compare/619094a...HyukjinKwon:another-suggestion?expand=1
Actually, we can avoid more per-record computation there in making safe
tokens as well (such as if-condition with parse mode).
We could add it there for now and maybe I can resolve all things related
with per-record computation in another PR.
Do you think this makes sense? WDYT @cloud-fan?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]