[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/17315 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r107055987 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class FailureSafeParser[IN]( +rawParser: IN => Seq[InternalRow], +mode: String, +schema: StructType, +columnNameOfCorruptRecord: String) { + + private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) + private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) + private val resultRow = new GenericInternalRow(schema.length) + private val nullResult = new GenericInternalRow(schema.length) + + // This function takes 2 parameters: an optional partial result, and the bad record. If the given + // schema doesn't contain a field for corrupted record, we just return the partial result or a + // row with all fields null. If the given schema contains a field for corrupted record, we will + // set the bad record to this field, and set other fields according to the partial result or null. + private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { +if (corruptFieldIndex.isDefined) { + (row, badRecord) => { +var i = 0 +while (i < actualSchema.length) { + val f = actualSchema(i) + resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, f.dataType)).orNull + i += 1 +} +resultRow(corruptFieldIndex.get) = badRecord() +resultRow + } +} else { + (row, _) => row.getOrElse(nullResult) +} + } + + def parse(input: IN): Iterator[InternalRow] = { +try { + rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) +} catch { + case e: BadRecordException if ParseModes.isPermissiveMode(mode) => +Iterator(toResultRow(e.partialResult(), e.record)) + case _: BadRecordException if ParseModes.isDropMalformedMode(mode) => +Iterator.empty + case e: BadRecordException => throw e.cause --- End diff -- we should add tests in follow-up --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r107055187 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class FailureSafeParser[IN]( +rawParser: IN => Seq[InternalRow], +mode: String, +schema: StructType, +columnNameOfCorruptRecord: String) { + + private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) + private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) + private val resultRow = new GenericInternalRow(schema.length) + private val nullResult = new GenericInternalRow(schema.length) + + // This function takes 2 parameters: an optional partial result, and the bad record. If the given + // schema doesn't contain a field for corrupted record, we just return the partial result or a + // row with all fields null. If the given schema contains a field for corrupted record, we will + // set the bad record to this field, and set other fields according to the partial result or null. + private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { +if (corruptFieldIndex.isDefined) { + (row, badRecord) => { +var i = 0 +while (i < actualSchema.length) { + val f = actualSchema(i) + resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, f.dataType)).orNull + i += 1 +} +resultRow(corruptFieldIndex.get) = badRecord() +resultRow + } +} else { + (row, _) => row.getOrElse(nullResult) +} + } + + def parse(input: IN): Iterator[InternalRow] = { +try { + rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) +} catch { + case e: BadRecordException if ParseModes.isPermissiveMode(mode) => +Iterator(toResultRow(e.partialResult(), e.record)) + case _: BadRecordException if ParseModes.isDropMalformedMode(mode) => +Iterator.empty + case e: BadRecordException => throw e.cause --- End diff -- uh... this is kind of tricky. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r107053686 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class FailureSafeParser[IN]( +rawParser: IN => Seq[InternalRow], +mode: String, +schema: StructType, +columnNameOfCorruptRecord: String) { + + private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) + private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) + private val resultRow = new GenericInternalRow(schema.length) + private val nullResult = new GenericInternalRow(schema.length) + + // This function takes 2 parameters: an optional partial result, and the bad record. If the given + // schema doesn't contain a field for corrupted record, we just return the partial result or a + // row with all fields null. If the given schema contains a field for corrupted record, we will + // set the bad record to this field, and set other fields according to the partial result or null. + private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { +if (corruptFieldIndex.isDefined) { + (row, badRecord) => { +var i = 0 +while (i < actualSchema.length) { + val f = actualSchema(i) + resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, f.dataType)).orNull + i += 1 +} +resultRow(corruptFieldIndex.get) = badRecord() +resultRow + } +} else { + (row, _) => row.getOrElse(nullResult) +} + } + + def parse(input: IN): Iterator[InternalRow] = { +try { + rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) +} catch { + case e: BadRecordException if ParseModes.isPermissiveMode(mode) => +Iterator(toResultRow(e.partialResult(), e.record)) + case _: BadRecordException if ParseModes.isDropMalformedMode(mode) => +Iterator.empty + case e: BadRecordException => throw e.cause --- End diff -- `ParseModes.isPermissiveMode` returns true if the mode string is invalid --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r107020700 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class FailureSafeParser[IN]( +rawParser: IN => Seq[InternalRow], +mode: String, +schema: StructType, +columnNameOfCorruptRecord: String) { + + private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) + private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) + private val resultRow = new GenericInternalRow(schema.length) + private val nullResult = new GenericInternalRow(schema.length) + + // This function takes 2 parameters: an optional partial result, and the bad record. If the given + // schema doesn't contain a field for corrupted record, we just return the partial result or a + // row with all fields null. If the given schema contains a field for corrupted record, we will + // set the bad record to this field, and set other fields according to the partial result or null. + private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { +if (corruptFieldIndex.isDefined) { + (row, badRecord) => { +var i = 0 +while (i < actualSchema.length) { + val f = actualSchema(i) + resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, f.dataType)).orNull + i += 1 +} +resultRow(corruptFieldIndex.get) = badRecord() +resultRow + } +} else { + (row, _) => row.getOrElse(nullResult) +} + } + + def parse(input: IN): Iterator[InternalRow] = { +try { + rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) +} catch { + case e: BadRecordException if ParseModes.isPermissiveMode(mode) => +Iterator(toResultRow(e.partialResult(), e.record)) + case _: BadRecordException if ParseModes.isDropMalformedMode(mode) => +Iterator.empty + case e: BadRecordException => throw e.cause --- End diff -- I just did a re-check. This part in CSV is kind of messy. The codes are random without any rule. At the very beginning, we should have test cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r107014888 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala --- @@ -233,81 +187,41 @@ class UnivocityParser( * Parses a single CSV string and turns it into either one resulting row or no row (if the * the record is malformed). */ - def parse(input: String): Option[InternalRow] = convert(tokenizer.parseLine(input)) - - private def convert(tokens: Array[String]): Option[InternalRow] = { -convertWithParseMode(tokens) { tokens => - var i: Int = 0 - while (i < tokenIndexArr.length) { -// It anyway needs to try to parse since it decides if this row is malformed -// or not after trying to cast in `DROPMALFORMED` mode even if the casted -// value is not stored in the row. -val from = tokenIndexArr(i) -val to = rowIndexArr(i) -val value = valueConverters(from).apply(tokens(from)) -if (i < requiredSchema.length) { - row(to) = value -} -i += 1 - } - row -} - } - - private def convertWithParseMode( - tokens: Array[String])(convert: Array[String] => InternalRow): Option[InternalRow] = { -if (options.dropMalformed && dataSchema.length != tokens.length) { - if (numMalformedRecords < options.maxMalformedLogPerPartition) { -logWarning(s"Dropping malformed line: ${tokens.mkString(options.delimiter.toString)}") - } - if (numMalformedRecords == options.maxMalformedLogPerPartition - 1) { -logWarning( - s"More than ${options.maxMalformedLogPerPartition} malformed records have been " + -"found on this partition. Malformed records from now on will not be logged.") + def parse(input: String): InternalRow = convert(tokenizer.parseLine(input)) + + private def convert(tokens: Array[String]): InternalRow = { +if (tokens.length != schema.length) { + // If the number of tokens doesn't match the schema, we should treat it as a malformed record. + // However, we still have chance to parse some of the tokens, by adding extra null tokens in + // the tail if the number is smaller, or by dropping extra tokens if the number is larger. + val checkedTokens = if (schema.length > tokens.length) { +tokens ++ new Array[String](schema.length - tokens.length) + } else { +tokens.take(schema.length) } - numMalformedRecords += 1 - None -} else if (options.failFast && dataSchema.length != tokens.length) { - throw new RuntimeException(s"Malformed line in FAILFAST mode: " + -s"${tokens.mkString(options.delimiter.toString)}") -} else { - // If a length of parsed tokens is not equal to expected one, it makes the length the same - // with the expected. If the length is shorter, it adds extra tokens in the tail. - // If longer, it drops extra tokens. - // - // TODO: Revisit this; if a length of tokens does not match an expected length in the schema, - // we probably need to treat it as a malformed record. - // See an URL below for related discussions: - // https://github.com/apache/spark/pull/16928#discussion_r102657214 - val checkedTokens = if (options.permissive && dataSchema.length != tokens.length) { -if (dataSchema.length > tokens.length) { - tokens ++ new Array[String](dataSchema.length - tokens.length) -} else { - tokens.take(dataSchema.length) + def getPartialResult(): Option[InternalRow] = { --- End diff -- Nit: `getPartialResult()` -> `getPartialResult` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r107014069 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala --- @@ -46,85 +46,39 @@ class UnivocityParser( // A `ValueConverter` is responsible for converting the given value to a desired type. private type ValueConverter = String => Any - private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord) - corruptFieldIndex.foreach { corrFieldIndex => -require(schema(corrFieldIndex).dataType == StringType) -require(schema(corrFieldIndex).nullable) - } - - private val dataSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord)) - private val tokenizer = new CsvParser(options.asParserSettings) - private var numMalformedRecords = 0 - private val row = new GenericInternalRow(requiredSchema.length) - // In `PERMISSIVE` parse mode, we should be able to put the raw malformed row into the field - // specified in `columnNameOfCorruptRecord`. The raw input is retrieved by this method. - private def getCurrentInput(): String = tokenizer.getContext.currentParsedContent().stripLineEnd + // Retrieve the raw record string. + private def getCurrentInput(): UTF8String = { --- End diff -- Nit: `getCurrentInput()` -> `getCurrentInput` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r107013706 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -435,14 +442,20 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { } verifyColumnNameOfCorruptRecord(schema, parsedOptions.columnNameOfCorruptRecord) +val dataSchema = StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) --- End diff -- Nit: dataSchema -> actualSchema? Be consistent what we did in the other place? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r107013316 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class FailureSafeParser[IN]( +rawParser: IN => Seq[InternalRow], +mode: String, +schema: StructType, +columnNameOfCorruptRecord: String) { + + private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) + private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) + private val resultRow = new GenericInternalRow(schema.length) + private val nullResult = new GenericInternalRow(schema.length) + + // This function takes 2 parameters: an optional partial result, and the bad record. If the given + // schema doesn't contain a field for corrupted record, we just return the partial result or a + // row with all fields null. If the given schema contains a field for corrupted record, we will + // set the bad record to this field, and set other fields according to the partial result or null. + private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { +if (corruptFieldIndex.isDefined) { + (row, badRecord) => { +var i = 0 +while (i < actualSchema.length) { + val f = actualSchema(i) + resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, f.dataType)).orNull + i += 1 +} +resultRow(corruptFieldIndex.get) = badRecord() +resultRow + } +} else { + (row, _) => row.getOrElse(nullResult) +} + } + + def parse(input: IN): Iterator[InternalRow] = { +try { + rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) +} catch { + case e: BadRecordException if ParseModes.isPermissiveMode(mode) => +Iterator(toResultRow(e.partialResult(), e.record)) + case _: BadRecordException if ParseModes.isDropMalformedMode(mode) => +Iterator.empty + case e: BadRecordException => throw e.cause --- End diff -- We need a test case to cover it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r107013197 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class FailureSafeParser[IN]( +rawParser: IN => Seq[InternalRow], +mode: String, +schema: StructType, +columnNameOfCorruptRecord: String) { + + private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) + private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) + private val resultRow = new GenericInternalRow(schema.length) + private val nullResult = new GenericInternalRow(schema.length) + + // This function takes 2 parameters: an optional partial result, and the bad record. If the given + // schema doesn't contain a field for corrupted record, we just return the partial result or a + // row with all fields null. If the given schema contains a field for corrupted record, we will + // set the bad record to this field, and set other fields according to the partial result or null. + private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { +if (corruptFieldIndex.isDefined) { + (row, badRecord) => { +var i = 0 +while (i < actualSchema.length) { + val f = actualSchema(i) --- End diff -- Nit: `f` -> `from` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r107012038 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -382,11 +383,17 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { } verifyColumnNameOfCorruptRecord(schema, parsedOptions.columnNameOfCorruptRecord) +val dataSchema = StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) --- End diff -- Nit: `dataSchema ` -> `actualSchema`? Be consistent what we did in the other place? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r107011371 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class FailureSafeParser[IN]( +rawParser: IN => Seq[InternalRow], +mode: String, +schema: StructType, +columnNameOfCorruptRecord: String) { + + private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) + private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) + private val resultRow = new GenericInternalRow(schema.length) + private val nullResult = new GenericInternalRow(schema.length) + + // This function takes 2 parameters: an optional partial result, and the bad record. If the given + // schema doesn't contain a field for corrupted record, we just return the partial result or a + // row with all fields null. If the given schema contains a field for corrupted record, we will + // set the bad record to this field, and set other fields according to the partial result or null. + private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { +if (corruptFieldIndex.isDefined) { + (row, badRecord) => { +var i = 0 +while (i < actualSchema.length) { + val f = actualSchema(i) + resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, f.dataType)).orNull + i += 1 +} +resultRow(corruptFieldIndex.get) = badRecord() +resultRow + } +} else { + (row, _) => row.getOrElse(nullResult) +} + } + + def parse(input: IN): Iterator[InternalRow] = { +try { + rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) +} catch { + case e: BadRecordException if ParseModes.isPermissiveMode(mode) => +Iterator(toResultRow(e.partialResult(), e.record)) + case _: BadRecordException if ParseModes.isDropMalformedMode(mode) => +Iterator.empty + case e: BadRecordException => throw e.cause --- End diff -- This is `FAIL_FAST_MODE`, if my understanding is not wrong. Should we issue the error message including `FAILFAST`, like what we did before? This is also an behavior change? If users did not correctly spell the mode string, we treated it as the `PERMISSIVE` mode. Now, we changed it to the `FAILFAST ` mode. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106852242 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala --- @@ -233,81 +187,39 @@ class UnivocityParser( * Parses a single CSV string and turns it into either one resulting row or no row (if the * the record is malformed). */ - def parse(input: String): Option[InternalRow] = convert(tokenizer.parseLine(input)) - - private def convert(tokens: Array[String]): Option[InternalRow] = { -convertWithParseMode(tokens) { tokens => - var i: Int = 0 - while (i < tokenIndexArr.length) { -// It anyway needs to try to parse since it decides if this row is malformed -// or not after trying to cast in `DROPMALFORMED` mode even if the casted -// value is not stored in the row. -val from = tokenIndexArr(i) -val to = rowIndexArr(i) -val value = valueConverters(from).apply(tokens(from)) -if (i < requiredSchema.length) { - row(to) = value -} -i += 1 - } - row -} - } - - private def convertWithParseMode( - tokens: Array[String])(convert: Array[String] => InternalRow): Option[InternalRow] = { -if (options.dropMalformed && dataSchema.length != tokens.length) { - if (numMalformedRecords < options.maxMalformedLogPerPartition) { -logWarning(s"Dropping malformed line: ${tokens.mkString(options.delimiter.toString)}") - } - if (numMalformedRecords == options.maxMalformedLogPerPartition - 1) { -logWarning( - s"More than ${options.maxMalformedLogPerPartition} malformed records have been " + -"found on this partition. Malformed records from now on will not be logged.") + def parse(input: String): InternalRow = convert(tokenizer.parseLine(input)) + + private def convert(tokens: Array[String]): InternalRow = { +if (tokens.length != schema.length) { + // If the number of tokens doesn't match the schema, we should treat it as a malformed record. + // However, we still have chance to parse some of the tokens, by adding extra null tokens in + // the tail if the number is smaller, or by dropping extra tokens if the number is larger. + val checkedTokens = if (schema.length > tokens.length) { +tokens ++ new Array[String](schema.length - tokens.length) + } else { +tokens.take(schema.length) } - numMalformedRecords += 1 - None -} else if (options.failFast && dataSchema.length != tokens.length) { - throw new RuntimeException(s"Malformed line in FAILFAST mode: " + -s"${tokens.mkString(options.delimiter.toString)}") -} else { - // If a length of parsed tokens is not equal to expected one, it makes the length the same - // with the expected. If the length is shorter, it adds extra tokens in the tail. - // If longer, it drops extra tokens. - // - // TODO: Revisit this; if a length of tokens does not match an expected length in the schema, - // we probably need to treat it as a malformed record. - // See an URL below for related discussions: - // https://github.com/apache/spark/pull/16928#discussion_r102657214 - val checkedTokens = if (options.permissive && dataSchema.length != tokens.length) { -if (dataSchema.length > tokens.length) { - tokens ++ new Array[String](dataSchema.length - tokens.length) -} else { - tokens.take(dataSchema.length) + def getPartialResult(): Option[InternalRow] = { +try { + Some(convert(checkedTokens)) +} catch { + case _: BadRecordException => None } - } else { -tokens } - + throw BadRecordException( +() => getCurrentInput(), +getPartialResult, +new RuntimeException("Malformed CSV record")) +} else { try { -Some(convert(checkedTokens)) +for (i <- requiredSchema.indices) { --- End diff -- (it seems this one missed. I am fine if it is new one but I am worried of the case of changing from `while` to `for`. Might be not a really big deal though). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106847502 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala --- @@ -113,8 +113,11 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { (file: PartitionedFile) => { val conf = broadcastedHadoopConf.value.value - val parser = new UnivocityParser(dataSchema, requiredSchema, parsedOptions) - CSVDataSource(parsedOptions).readFile(conf, file, parser, parsedOptions) + val parser = new UnivocityParser( +StructType(dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)), +StructType(requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)), +parsedOptions) + CSVDataSource(parsedOptions).readFile(conf, file, parser, requiredSchema) --- End diff -- We need, and the logic is in `readFile` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106847415 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -55,108 +52,6 @@ class JacksonParser( private val factory = new JsonFactory() options.setJacksonOptions(factory) - private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length)) - - private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord) - corruptFieldIndex.foreach { corrFieldIndex => -require(schema(corrFieldIndex).dataType == StringType) -require(schema(corrFieldIndex).nullable) - } --- End diff -- This is just a sanity check, actually this check is already done in `DataFrameReader.csv/json` and `JsonFileFormat/CSVFileFormat` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106842646 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala --- @@ -113,8 +113,11 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { (file: PartitionedFile) => { val conf = broadcastedHadoopConf.value.value - val parser = new UnivocityParser(dataSchema, requiredSchema, parsedOptions) - CSVDataSource(parsedOptions).readFile(conf, file, parser, parsedOptions) + val parser = new UnivocityParser( +StructType(dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)), +StructType(requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)), +parsedOptions) + CSVDataSource(parsedOptions).readFile(conf, file, parser, requiredSchema) --- End diff -- We do not need `FailureSafeParser` in this case? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106840886 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -55,108 +52,6 @@ class JacksonParser( private val factory = new JsonFactory() options.setJacksonOptions(factory) - private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length)) - - private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord) - corruptFieldIndex.foreach { corrFieldIndex => -require(schema(corrFieldIndex).dataType == StringType) -require(schema(corrFieldIndex).nullable) - } --- End diff -- The above checking sounds missing in the new codes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106840806 --- Diff: R/pkg/inst/tests/testthat/test_sparkSQL.R --- @@ -1354,9 +1354,8 @@ test_that("column functions", { # passing option df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}"))) schema2 <- structType(structField("date", "date")) - expect_error(tryCatch(collect(select(df, from_json(df$col, schema2))), -error = function(e) { stop(e) }), - paste0(".*(java.lang.NumberFormatException: For input string:).*")) + s <- collect(select(df, from_json(df$col, schema2))) + expect_equal(s[[1]][[1]], NA) --- End diff -- uh, I see. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106840345 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -382,11 +383,17 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { } verifyColumnNameOfCorruptRecord(schema, parsedOptions.columnNameOfCorruptRecord) +val dataSchema = StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) val createParser = CreateJacksonParser.string _ val parsed = jsonDataset.rdd.mapPartitions { iter => - val parser = new JacksonParser(schema, parsedOptions) - iter.flatMap(parser.parse(_, createParser, UTF8String.fromString)) + val rawParser = new JacksonParser(dataSchema, parsedOptions) + val parser = new FailureSafeParser[String]( +input => rawParser.parse(input, createParser, UTF8String.fromString), +parsedOptions.parseMode, +schema, --- End diff -- we need `schema` anyway, passing `dataSchema` only saves one line in `FailureSafeParser`... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106840206 --- Diff: R/pkg/inst/tests/testthat/test_sparkSQL.R --- @@ -1354,9 +1354,8 @@ test_that("column functions", { # passing option df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}"))) schema2 <- structType(structField("date", "date")) - expect_error(tryCatch(collect(select(df, from_json(df$col, schema2))), -error = function(e) { stop(e) }), - paste0(".*(java.lang.NumberFormatException: For input string:).*")) + s <- collect(select(df, from_json(df$col, schema2))) + expect_equal(s[[1]][[1]], NA) --- End diff -- yea it's a minor bug fix, see https://github.com/cloud-fan/spark/pull/4 I'm not sure if it worth a ticket. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106840169 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class FailureSafeParser[IN]( +func: IN => Seq[InternalRow], +mode: String, +schema: StructType, +columnNameOfCorruptRecord: String) { + + private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) + private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) + private val resultRow = new GenericInternalRow(schema.length) + private val nullResult = new GenericInternalRow(schema.length) + + // This function takes 2 parameters: an optional partial result, and the bad record. If the given + // schema doesn't contain a field for corrupted record, we just return the partial result or a + // row with all fields null. If the given schema contains a field for corrupted record, we will + // set the bad record to this field, and set other fields according to the partial result or null. + private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { +if (corruptFieldIndex.isDefined) { + (row, badRecord) => { +var i = 0 +while (i < actualSchema.length) { + val f = actualSchema(i) + resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, f.dataType)).orNull + i += 1 +} +resultRow(corruptFieldIndex.get) = badRecord() +resultRow + } +} else { + (row, badRecord) => row.getOrElse(nullResult) --- End diff -- Nit: `(row, badRecord)` -> `(row, _)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106840126 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class FailureSafeParser[IN]( +func: IN => Seq[InternalRow], --- End diff -- rename it? rawParser? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106840034 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -382,11 +383,17 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { } verifyColumnNameOfCorruptRecord(schema, parsedOptions.columnNameOfCorruptRecord) +val dataSchema = StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) val createParser = CreateJacksonParser.string _ val parsed = jsonDataset.rdd.mapPartitions { iter => - val parser = new JacksonParser(schema, parsedOptions) - iter.flatMap(parser.parse(_, createParser, UTF8String.fromString)) + val rawParser = new JacksonParser(dataSchema, parsedOptions) + val parser = new FailureSafeParser[String]( +input => rawParser.parse(input, createParser, UTF8String.fromString), +parsedOptions.parseMode, +schema, --- End diff -- How about passing `dataSchema` to `FailureSafeParser`? It can simplify the codes in `FailureSafeParser` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106838506 --- Diff: R/pkg/inst/tests/testthat/test_sparkSQL.R --- @@ -1354,9 +1354,8 @@ test_that("column functions", { # passing option df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}"))) schema2 <- structType(structField("date", "date")) - expect_error(tryCatch(collect(select(df, from_json(df$col, schema2))), -error = function(e) { stop(e) }), - paste0(".*(java.lang.NumberFormatException: For input string:).*")) + s <- collect(select(df, from_json(df$col, schema2))) + expect_equal(s[[1]][[1]], NA) --- End diff -- or a bug fix? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106838434 --- Diff: R/pkg/inst/tests/testthat/test_sparkSQL.R --- @@ -1354,9 +1354,8 @@ test_that("column functions", { # passing option df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}"))) schema2 <- structType(structField("date", "date")) - expect_error(tryCatch(collect(select(df, from_json(df$col, schema2))), -error = function(e) { stop(e) }), - paste0(".*(java.lang.NumberFormatException: For input string:).*")) + s <- collect(select(df, from_json(df$col, schema2))) + expect_equal(s[[1]][[1]], NA) --- End diff -- This also sounds a behavior change. Could you add another test case here to trigger the exception? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106806437 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala --- @@ -233,81 +187,39 @@ class UnivocityParser( * Parses a single CSV string and turns it into either one resulting row or no row (if the * the record is malformed). */ - def parse(input: String): Option[InternalRow] = convert(tokenizer.parseLine(input)) - - private def convert(tokens: Array[String]): Option[InternalRow] = { -convertWithParseMode(tokens) { tokens => - var i: Int = 0 - while (i < tokenIndexArr.length) { -// It anyway needs to try to parse since it decides if this row is malformed -// or not after trying to cast in `DROPMALFORMED` mode even if the casted -// value is not stored in the row. -val from = tokenIndexArr(i) -val to = rowIndexArr(i) -val value = valueConverters(from).apply(tokens(from)) -if (i < requiredSchema.length) { - row(to) = value -} -i += 1 - } - row -} - } - - private def convertWithParseMode( - tokens: Array[String])(convert: Array[String] => InternalRow): Option[InternalRow] = { -if (options.dropMalformed && dataSchema.length != tokens.length) { - if (numMalformedRecords < options.maxMalformedLogPerPartition) { -logWarning(s"Dropping malformed line: ${tokens.mkString(options.delimiter.toString)}") - } - if (numMalformedRecords == options.maxMalformedLogPerPartition - 1) { -logWarning( - s"More than ${options.maxMalformedLogPerPartition} malformed records have been " + -"found on this partition. Malformed records from now on will not be logged.") + def parse(input: String): InternalRow = convert(tokenizer.parseLine(input)) + + private def convert(tokens: Array[String]): InternalRow = { +if (tokens.length != schema.length) { + // If the number of tokens doesn't match the schema, we should treat it as a malformed record. + // However, we still have chance to parse some of the tokens, by adding extra null tokens in + // the tail if the number is smaller, or by dropping extra tokens if the number is larger. + val checkedTokens = if (schema.length > tokens.length) { +tokens ++ new Array[String](schema.length - tokens.length) + } else { +tokens.take(schema.length) } - numMalformedRecords += 1 - None -} else if (options.failFast && dataSchema.length != tokens.length) { - throw new RuntimeException(s"Malformed line in FAILFAST mode: " + -s"${tokens.mkString(options.delimiter.toString)}") -} else { - // If a length of parsed tokens is not equal to expected one, it makes the length the same - // with the expected. If the length is shorter, it adds extra tokens in the tail. - // If longer, it drops extra tokens. - // - // TODO: Revisit this; if a length of tokens does not match an expected length in the schema, - // we probably need to treat it as a malformed record. - // See an URL below for related discussions: - // https://github.com/apache/spark/pull/16928#discussion_r102657214 - val checkedTokens = if (options.permissive && dataSchema.length != tokens.length) { -if (dataSchema.length > tokens.length) { - tokens ++ new Array[String](dataSchema.length - tokens.length) -} else { - tokens.take(dataSchema.length) + def getPartialResult(): Option[InternalRow] = { +try { + Some(convert(checkedTokens)) +} catch { + case _: BadRecordException => None } - } else { -tokens } - + throw BadRecordException( +() => getCurrentInput(), +getPartialResult, +new RuntimeException("Malformed CSV record")) --- End diff -- yes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106768186 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class FailureSafeParser[IN]( +func: IN => Seq[InternalRow], +mode: String, +schema: StructType, +columnNameOfCorruptRecord: String) { + + private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) + private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) + private val resultRow = new GenericInternalRow(schema.length) + + private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { +if (corruptFieldIndex.isDefined) { + (row, badRecord) => { +for ((f, i) <- actualSchema.zipWithIndex) { + resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, f.dataType)).orNull +} +resultRow(corruptFieldIndex.get) = badRecord() +resultRow + } +} else { + (row, badRecord) => row.getOrElse { +for (i <- schema.indices) resultRow.setNullAt(i) --- End diff -- I ran some tests with the codes below to help. ```scala object ForWhile { def forloop = { val l = Array[Int](1,2,3) for (i <- l) { } } def whileloop = { val arr = Array[Int](1,2,3) var idx = 0 while(idx < arr.length) { idx += 1 } } } ``` ``` Compiled from "ForWhile.scala" public final class ForWhile { public static void whileloop(); Code: 0: getstatic #16 // Field ForWhile$.MODULE$:LForWhile$; 3: invokevirtual #18 // Method ForWhile$.whileloop:()V 6: return public static void forloop(); Code: 0: getstatic #16 // Field ForWhile$.MODULE$:LForWhile$; 3: invokevirtual #21 // Method ForWhile$.forloop:()V 6: return } Compiled from "ForWhile.scala" public final class ForWhile$ { public static final ForWhile$ MODULE$; public static {}; Code: 0: new #2 // class ForWhile$ 3: invokespecial #12 // Method "":()V 6: return public void forloop(); Code: 0: getstatic #18 // Field scala/Array$.MODULE$:Lscala/Array$; 3: getstatic #23 // Field scala/Predef$.MODULE$:Lscala/Predef$; 6: iconst_3 7: newarray int 9: dup 10: iconst_0 11: iconst_1 12: iastore 13: dup 14: iconst_1 15: iconst_2 16: iastore 17: dup 18: iconst_2 19: iconst_3 20: iastore 21: invokevirtual #27 // Method scala/Predef$.wrapIntArray:([I)Lscala/collection/mutable/WrappedArray; 24: getstatic #32 // Field scala/reflect/ClassTag$.MODULE$:Lscala/reflect/ClassTag$; 27: invokevirtual #36 // Method scala/reflect/ClassTag$.Int:()Lscala/reflect/ClassTag; 30: invokevirtual #40 // Method scala/Array$.apply:(Lscala/collection/Seq;Lscala/reflect/ClassTag;)Ljava/lang/Object; 33: checkcast #42 // class "[I" 36: astore_1 37: getstatic #23 // Field scala/Predef$.MODULE$:Lscala/Predef$; 40: aload_1 41:
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106767486 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class FailureSafeParser[IN]( +func: IN => Seq[InternalRow], +mode: String, +schema: StructType, +columnNameOfCorruptRecord: String) { + + private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) + private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) + private val resultRow = new GenericInternalRow(schema.length) + + private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { +if (corruptFieldIndex.isDefined) { + (row, badRecord) => { +for ((f, i) <- actualSchema.zipWithIndex) { + resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, f.dataType)).orNull +} +resultRow(corruptFieldIndex.get) = badRecord() +resultRow + } +} else { + (row, badRecord) => row.getOrElse { +for (i <- schema.indices) resultRow.setNullAt(i) --- End diff -- I ran some tests. I can see boxing/unboxing where it seems does not exist in while loop. ```scala object ForWhile { def forloop = { val l = Seq(1,2,3).indices for (i <- l) println(i) } } ``` ``` 2: invokestatic #46 // Method scala/runtime/BoxesRunTime.unboxToInt:(Ljava/lang/Object;)I ``` ``` 4: invokestatic #37 // Method scala/runtime/BoxesRunTime.boxToInteger:(I)Ljava/lang/Integer; ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106650196 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class FailureSafeParser[IN]( +func: IN => Seq[InternalRow], +mode: String, +schema: StructType, +columnNameOfCorruptRecord: String) { + + private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) + private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) + private val resultRow = new GenericInternalRow(schema.length) + + private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { +if (corruptFieldIndex.isDefined) { + (row, badRecord) => { +for ((f, i) <- actualSchema.zipWithIndex) { --- End diff -- Here too. I think this case applies to https://github.com/databricks/scala-style-guide#traversal-and-zipwithindex . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106664841 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class FailureSafeParser[IN]( +func: IN => Seq[InternalRow], +mode: String, +schema: StructType, +columnNameOfCorruptRecord: String) { + + private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) + private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) + private val resultRow = new GenericInternalRow(schema.length) + + private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { +if (corruptFieldIndex.isDefined) { + (row, badRecord) => { +for ((f, i) <- actualSchema.zipWithIndex) { + resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, f.dataType)).orNull +} +resultRow(corruptFieldIndex.get) = badRecord() +resultRow + } +} else { + (row, badRecord) => row.getOrElse { +for (i <- schema.indices) resultRow.setNullAt(i) +resultRow + } +} + } + + def parse(input: IN): Iterator[InternalRow] = { +try { + func(input).toIterator.map(row => toResultRow(Some(row), () => null)) +} catch { + case e: BadRecordException if ParseModes.isPermissiveMode(mode) => +Iterator(toResultRow(e.partialResult(), e.record)) + case _: BadRecordException if ParseModes.isDropMalformedMode(mode) => +Iterator.empty + // If the parse mode is FAIL FAST, do not catch the exception. +} + } +} + +case class BadRecordException( +record: () => UTF8String, +partialResult: () => Option[InternalRow], --- End diff -- I think we could leave some comments for `partialResult` too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106664412 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class FailureSafeParser[IN]( +func: IN => Seq[InternalRow], +mode: String, +schema: StructType, +columnNameOfCorruptRecord: String) { + + private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) + private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) + private val resultRow = new GenericInternalRow(schema.length) + + private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { --- End diff -- Maybe, we should explain what happens within `toResultRow`. This might also be okay to be included in the follow-up. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106655740 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala --- @@ -233,81 +187,39 @@ class UnivocityParser( * Parses a single CSV string and turns it into either one resulting row or no row (if the * the record is malformed). */ - def parse(input: String): Option[InternalRow] = convert(tokenizer.parseLine(input)) - - private def convert(tokens: Array[String]): Option[InternalRow] = { -convertWithParseMode(tokens) { tokens => - var i: Int = 0 - while (i < tokenIndexArr.length) { -// It anyway needs to try to parse since it decides if this row is malformed -// or not after trying to cast in `DROPMALFORMED` mode even if the casted -// value is not stored in the row. -val from = tokenIndexArr(i) -val to = rowIndexArr(i) -val value = valueConverters(from).apply(tokens(from)) -if (i < requiredSchema.length) { - row(to) = value -} -i += 1 - } - row -} - } - - private def convertWithParseMode( - tokens: Array[String])(convert: Array[String] => InternalRow): Option[InternalRow] = { -if (options.dropMalformed && dataSchema.length != tokens.length) { - if (numMalformedRecords < options.maxMalformedLogPerPartition) { -logWarning(s"Dropping malformed line: ${tokens.mkString(options.delimiter.toString)}") - } - if (numMalformedRecords == options.maxMalformedLogPerPartition - 1) { -logWarning( - s"More than ${options.maxMalformedLogPerPartition} malformed records have been " + -"found on this partition. Malformed records from now on will not be logged.") + def parse(input: String): InternalRow = convert(tokenizer.parseLine(input)) + + private def convert(tokens: Array[String]): InternalRow = { +if (tokens.length != schema.length) { + // If the number of tokens doesn't match the schema, we should treat it as a malformed record. + // However, we still have chance to parse some of the tokens, by adding extra null tokens in + // the tail if the number is smaller, or by dropping extra tokens if the number is larger. + val checkedTokens = if (schema.length > tokens.length) { +tokens ++ new Array[String](schema.length - tokens.length) + } else { +tokens.take(schema.length) } - numMalformedRecords += 1 - None -} else if (options.failFast && dataSchema.length != tokens.length) { - throw new RuntimeException(s"Malformed line in FAILFAST mode: " + -s"${tokens.mkString(options.delimiter.toString)}") -} else { - // If a length of parsed tokens is not equal to expected one, it makes the length the same - // with the expected. If the length is shorter, it adds extra tokens in the tail. - // If longer, it drops extra tokens. - // - // TODO: Revisit this; if a length of tokens does not match an expected length in the schema, - // we probably need to treat it as a malformed record. - // See an URL below for related discussions: - // https://github.com/apache/spark/pull/16928#discussion_r102657214 - val checkedTokens = if (options.permissive && dataSchema.length != tokens.length) { -if (dataSchema.length > tokens.length) { - tokens ++ new Array[String](dataSchema.length - tokens.length) -} else { - tokens.take(dataSchema.length) + def getPartialResult(): Option[InternalRow] = { +try { + Some(convert(checkedTokens)) +} catch { + case _: BadRecordException => None } - } else { -tokens } - + throw BadRecordException( +() => getCurrentInput(), +getPartialResult, +new RuntimeException("Malformed CSV record")) --- End diff -- I see. So, `BadRecordException` contains the information about which record is malformed so that users can check this. Did I understand correctly? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106653556 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -55,108 +52,6 @@ class JacksonParser( private val factory = new JsonFactory() options.setJacksonOptions(factory) - private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length)) - - private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord) - corruptFieldIndex.foreach { corrFieldIndex => -require(schema(corrFieldIndex).dataType == StringType) -require(schema(corrFieldIndex).nullable) - } - - @transient - private[this] var isWarningPrinted: Boolean = false - - @transient - private def printWarningForMalformedRecord(record: () => UTF8String): Unit = { -def sampleRecord: String = { - if (options.wholeFile) { -"" - } else { -s"Sample record: ${record()}\n" - } -} - -def footer: String = { --- End diff -- It seems here the warnings are not being printed for both. Should we maybe consider printing shorten versions of messages for CSV and JSON? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106648055 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class FailureSafeParser[IN]( +func: IN => Seq[InternalRow], +mode: String, +schema: StructType, +columnNameOfCorruptRecord: String) { + + private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) + private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) + private val resultRow = new GenericInternalRow(schema.length) + + private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { +if (corruptFieldIndex.isDefined) { + (row, badRecord) => { +for ((f, i) <- actualSchema.zipWithIndex) { + resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, f.dataType)).orNull +} +resultRow(corruptFieldIndex.get) = badRecord() +resultRow + } +} else { + (row, badRecord) => row.getOrElse { +for (i <- schema.indices) resultRow.setNullAt(i) --- End diff -- Up to my understanding, the last commit focuses on simplification. I like that but I think maybe we should use `while` here instead unless we are very sure that the byte codes are virtually the same or more efficient in the critical path. This change might not harm the simplification much here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106593549 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -65,7 +65,7 @@ private[sql] class JSONOptions( val allowBackslashEscapingAnyCharacter = parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false) val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName) - private val parseMode = parameters.getOrElse("mode", "PERMISSIVE") + val parseMode = parameters.getOrElse("mode", "PERMISSIVE") --- End diff -- yea this can be a good follow-up --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106570171 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -65,7 +65,7 @@ private[sql] class JSONOptions( val allowBackslashEscapingAnyCharacter = parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false) val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName) - private val parseMode = parameters.getOrElse("mode", "PERMISSIVE") + val parseMode = parameters.getOrElse("mode", "PERMISSIVE") --- End diff -- How about creating an enum, like what we are doing for `SaveMode`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17315#discussion_r106566183 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -391,9 +288,9 @@ class JacksonParser( case token => // We cannot parse this token based on the given data type. So, we throw a - // SparkSQLJsonProcessingException and this exception will be caught by + // SparkSQLRuntimeException and this exception will be caught by --- End diff -- `RuntimeException `? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/17315 [SPARK-19949][SQL] unify bad record handling in CSV and JSON ## What changes were proposed in this pull request? Currently JSON and CSV have exactly the same logic about handling bad records, this PR tries to abstract it and put it in a upper level to reduce code duplication. The overall idea is, we make the JSON and CSV parser to throw a BadRecordException, then the upper level, FailureSafeParser, handles bad records according to the parse mode. Behavior changes: 1. with PERMISSIVE mode, if the number of tokens doesn't match the schema, previously CSV parser will treat it as a legal record and parse as many tokens as possible. After this PR, we treat it as an illegal record, and put the raw record string in a special column, but we still parse as many tokens as possible. 2. all logging is removed as they are not very useful in practice. ## How was this patch tested? existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark bad-record2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17315.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17315 commit adfde77125eb31b262a2f010851beef2b872e1e8 Author: Wenchen FanDate: 2017-03-16T13:52:59Z unify bad record handling in CSV and JSON --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org