Github user maropu commented on a diff in the pull request:
https://github.com/apache/spark/pull/22237#discussion_r212891660
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
---
@@ -15,50 +15,51 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.datasources
+package org.apache.spark.sql.catalyst.util
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.unsafe.types.UTF8String
class FailureSafeParser[IN](
rawParser: IN => Seq[InternalRow],
mode: ParseMode,
- schema: StructType,
+ schema: DataType,
columnNameOfCorruptRecord: String,
isMultiLine: Boolean) {
-
- private val corruptFieldIndex =
schema.getFieldIndex(columnNameOfCorruptRecord)
- private val actualSchema = StructType(schema.filterNot(_.name ==
columnNameOfCorruptRecord))
- private val resultRow = new GenericInternalRow(schema.length)
- private val nullResult = new GenericInternalRow(schema.length)
-
// This function takes 2 parameters: an optional partial result, and the
bad record. If the given
// schema doesn't contain a field for corrupted record, we just return
the partial result or a
// row with all fields null. If the given schema contains a field for
corrupted record, we will
// set the bad record to this field, and set other fields according to
the partial result or null.
- private val toResultRow: (Option[InternalRow], () => UTF8String) =>
InternalRow = {
- if (corruptFieldIndex.isDefined) {
- (row, badRecord) => {
- var i = 0
- while (i < actualSchema.length) {
- val from = actualSchema(i)
- resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i,
from.dataType)).orNull
- i += 1
+ private val toResultRow: (Option[InternalRow], () => UTF8String) =>
InternalRow = schema match {
+ case struct: StructType =>
+ val corruptFieldIndex =
struct.getFieldIndex(columnNameOfCorruptRecord)
+ val actualSchema = StructType(struct.filterNot(_.name ==
columnNameOfCorruptRecord))
+ val resultRow = new GenericInternalRow(struct.length)
+ val nullResult = new GenericInternalRow(struct.length)
+ if (corruptFieldIndex.isDefined) {
--- End diff --
Can we move `actualSchema` and `resultRow` into `if
(corruptFieldIndex.isDefined) {` inside?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]