[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/17315


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r107055987
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class FailureSafeParser[IN](
+rawParser: IN => Seq[InternalRow],
+mode: String,
+schema: StructType,
+columnNameOfCorruptRecord: String) {
+
+  private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
+  private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
+  private val resultRow = new GenericInternalRow(schema.length)
+  private val nullResult = new GenericInternalRow(schema.length)
+
+  // This function takes 2 parameters: an optional partial result, and the 
bad record. If the given
+  // schema doesn't contain a field for corrupted record, we just return 
the partial result or a
+  // row with all fields null. If the given schema contains a field for 
corrupted record, we will
+  // set the bad record to this field, and set other fields according to 
the partial result or null.
+  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = {
+if (corruptFieldIndex.isDefined) {
+  (row, badRecord) => {
+var i = 0
+while (i < actualSchema.length) {
+  val f = actualSchema(i)
+  resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, 
f.dataType)).orNull
+  i += 1
+}
+resultRow(corruptFieldIndex.get) = badRecord()
+resultRow
+  }
+} else {
+  (row, _) => row.getOrElse(nullResult)
+}
+  }
+
+  def parse(input: IN): Iterator[InternalRow] = {
+try {
+  rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
+} catch {
+  case e: BadRecordException if ParseModes.isPermissiveMode(mode) =>
+Iterator(toResultRow(e.partialResult(), e.record))
+  case _: BadRecordException if ParseModes.isDropMalformedMode(mode) =>
+Iterator.empty
+  case e: BadRecordException => throw e.cause
--- End diff --

we should add tests in follow-up


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r107055187
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class FailureSafeParser[IN](
+rawParser: IN => Seq[InternalRow],
+mode: String,
+schema: StructType,
+columnNameOfCorruptRecord: String) {
+
+  private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
+  private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
+  private val resultRow = new GenericInternalRow(schema.length)
+  private val nullResult = new GenericInternalRow(schema.length)
+
+  // This function takes 2 parameters: an optional partial result, and the 
bad record. If the given
+  // schema doesn't contain a field for corrupted record, we just return 
the partial result or a
+  // row with all fields null. If the given schema contains a field for 
corrupted record, we will
+  // set the bad record to this field, and set other fields according to 
the partial result or null.
+  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = {
+if (corruptFieldIndex.isDefined) {
+  (row, badRecord) => {
+var i = 0
+while (i < actualSchema.length) {
+  val f = actualSchema(i)
+  resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, 
f.dataType)).orNull
+  i += 1
+}
+resultRow(corruptFieldIndex.get) = badRecord()
+resultRow
+  }
+} else {
+  (row, _) => row.getOrElse(nullResult)
+}
+  }
+
+  def parse(input: IN): Iterator[InternalRow] = {
+try {
+  rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
+} catch {
+  case e: BadRecordException if ParseModes.isPermissiveMode(mode) =>
+Iterator(toResultRow(e.partialResult(), e.record))
+  case _: BadRecordException if ParseModes.isDropMalformedMode(mode) =>
+Iterator.empty
+  case e: BadRecordException => throw e.cause
--- End diff --

uh... this is kind of tricky. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r107053686
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class FailureSafeParser[IN](
+rawParser: IN => Seq[InternalRow],
+mode: String,
+schema: StructType,
+columnNameOfCorruptRecord: String) {
+
+  private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
+  private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
+  private val resultRow = new GenericInternalRow(schema.length)
+  private val nullResult = new GenericInternalRow(schema.length)
+
+  // This function takes 2 parameters: an optional partial result, and the 
bad record. If the given
+  // schema doesn't contain a field for corrupted record, we just return 
the partial result or a
+  // row with all fields null. If the given schema contains a field for 
corrupted record, we will
+  // set the bad record to this field, and set other fields according to 
the partial result or null.
+  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = {
+if (corruptFieldIndex.isDefined) {
+  (row, badRecord) => {
+var i = 0
+while (i < actualSchema.length) {
+  val f = actualSchema(i)
+  resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, 
f.dataType)).orNull
+  i += 1
+}
+resultRow(corruptFieldIndex.get) = badRecord()
+resultRow
+  }
+} else {
+  (row, _) => row.getOrElse(nullResult)
+}
+  }
+
+  def parse(input: IN): Iterator[InternalRow] = {
+try {
+  rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
+} catch {
+  case e: BadRecordException if ParseModes.isPermissiveMode(mode) =>
+Iterator(toResultRow(e.partialResult(), e.record))
+  case _: BadRecordException if ParseModes.isDropMalformedMode(mode) =>
+Iterator.empty
+  case e: BadRecordException => throw e.cause
--- End diff --

`ParseModes.isPermissiveMode` returns true if the mode string is invalid


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r107020700
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class FailureSafeParser[IN](
+rawParser: IN => Seq[InternalRow],
+mode: String,
+schema: StructType,
+columnNameOfCorruptRecord: String) {
+
+  private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
+  private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
+  private val resultRow = new GenericInternalRow(schema.length)
+  private val nullResult = new GenericInternalRow(schema.length)
+
+  // This function takes 2 parameters: an optional partial result, and the 
bad record. If the given
+  // schema doesn't contain a field for corrupted record, we just return 
the partial result or a
+  // row with all fields null. If the given schema contains a field for 
corrupted record, we will
+  // set the bad record to this field, and set other fields according to 
the partial result or null.
+  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = {
+if (corruptFieldIndex.isDefined) {
+  (row, badRecord) => {
+var i = 0
+while (i < actualSchema.length) {
+  val f = actualSchema(i)
+  resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, 
f.dataType)).orNull
+  i += 1
+}
+resultRow(corruptFieldIndex.get) = badRecord()
+resultRow
+  }
+} else {
+  (row, _) => row.getOrElse(nullResult)
+}
+  }
+
+  def parse(input: IN): Iterator[InternalRow] = {
+try {
+  rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
+} catch {
+  case e: BadRecordException if ParseModes.isPermissiveMode(mode) =>
+Iterator(toResultRow(e.partialResult(), e.record))
+  case _: BadRecordException if ParseModes.isDropMalformedMode(mode) =>
+Iterator.empty
+  case e: BadRecordException => throw e.cause
--- End diff --

I just did a re-check. This part in CSV is kind of messy. The codes are 
random without any rule. At the very beginning, we should have test cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r107014888
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 ---
@@ -233,81 +187,41 @@ class UnivocityParser(
* Parses a single CSV string and turns it into either one resulting row 
or no row (if the
* the record is malformed).
*/
-  def parse(input: String): Option[InternalRow] = 
convert(tokenizer.parseLine(input))
-
-  private def convert(tokens: Array[String]): Option[InternalRow] = {
-convertWithParseMode(tokens) { tokens =>
-  var i: Int = 0
-  while (i < tokenIndexArr.length) {
-// It anyway needs to try to parse since it decides if this row is 
malformed
-// or not after trying to cast in `DROPMALFORMED` mode even if the 
casted
-// value is not stored in the row.
-val from = tokenIndexArr(i)
-val to = rowIndexArr(i)
-val value = valueConverters(from).apply(tokens(from))
-if (i < requiredSchema.length) {
-  row(to) = value
-}
-i += 1
-  }
-  row
-}
-  }
-
-  private def convertWithParseMode(
-  tokens: Array[String])(convert: Array[String] => InternalRow): 
Option[InternalRow] = {
-if (options.dropMalformed && dataSchema.length != tokens.length) {
-  if (numMalformedRecords < options.maxMalformedLogPerPartition) {
-logWarning(s"Dropping malformed line: 
${tokens.mkString(options.delimiter.toString)}")
-  }
-  if (numMalformedRecords == options.maxMalformedLogPerPartition - 1) {
-logWarning(
-  s"More than ${options.maxMalformedLogPerPartition} malformed 
records have been " +
-"found on this partition. Malformed records from now on will 
not be logged.")
+  def parse(input: String): InternalRow = 
convert(tokenizer.parseLine(input))
+
+  private def convert(tokens: Array[String]): InternalRow = {
+if (tokens.length != schema.length) {
+  // If the number of tokens doesn't match the schema, we should treat 
it as a malformed record.
+  // However, we still have chance to parse some of the tokens, by 
adding extra null tokens in
+  // the tail if the number is smaller, or by dropping extra tokens if 
the number is larger.
+  val checkedTokens = if (schema.length > tokens.length) {
+tokens ++ new Array[String](schema.length - tokens.length)
+  } else {
+tokens.take(schema.length)
   }
-  numMalformedRecords += 1
-  None
-} else if (options.failFast && dataSchema.length != tokens.length) {
-  throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
-s"${tokens.mkString(options.delimiter.toString)}")
-} else {
-  // If a length of parsed tokens is not equal to expected one, it 
makes the length the same
-  // with the expected. If the length is shorter, it adds extra tokens 
in the tail.
-  // If longer, it drops extra tokens.
-  //
-  // TODO: Revisit this; if a length of tokens does not match an 
expected length in the schema,
-  // we probably need to treat it as a malformed record.
-  // See an URL below for related discussions:
-  // https://github.com/apache/spark/pull/16928#discussion_r102657214
-  val checkedTokens = if (options.permissive && dataSchema.length != 
tokens.length) {
-if (dataSchema.length > tokens.length) {
-  tokens ++ new Array[String](dataSchema.length - tokens.length)
-} else {
-  tokens.take(dataSchema.length)
+  def getPartialResult(): Option[InternalRow] = {
--- End diff --

Nit: `getPartialResult()` -> `getPartialResult`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r107014069
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 ---
@@ -46,85 +46,39 @@ class UnivocityParser(
   // A `ValueConverter` is responsible for converting the given value to a 
desired type.
   private type ValueConverter = String => Any
 
-  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
-  corruptFieldIndex.foreach { corrFieldIndex =>
-require(schema(corrFieldIndex).dataType == StringType)
-require(schema(corrFieldIndex).nullable)
-  }
-
-  private val dataSchema = StructType(schema.filter(_.name != 
options.columnNameOfCorruptRecord))
-
   private val tokenizer = new CsvParser(options.asParserSettings)
 
-  private var numMalformedRecords = 0
-
   private val row = new GenericInternalRow(requiredSchema.length)
 
-  // In `PERMISSIVE` parse mode, we should be able to put the raw 
malformed row into the field
-  // specified in `columnNameOfCorruptRecord`. The raw input is retrieved 
by this method.
-  private def getCurrentInput(): String = 
tokenizer.getContext.currentParsedContent().stripLineEnd
+  // Retrieve the raw record string.
+  private def getCurrentInput(): UTF8String = {
--- End diff --

Nit: `getCurrentInput()` -> `getCurrentInput`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r107013706
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -435,14 +442,20 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
 }
 
 verifyColumnNameOfCorruptRecord(schema, 
parsedOptions.columnNameOfCorruptRecord)
+val dataSchema = StructType(schema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
--- End diff --

Nit: dataSchema -> actualSchema? Be consistent what we did in the other 
place?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r107013316
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class FailureSafeParser[IN](
+rawParser: IN => Seq[InternalRow],
+mode: String,
+schema: StructType,
+columnNameOfCorruptRecord: String) {
+
+  private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
+  private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
+  private val resultRow = new GenericInternalRow(schema.length)
+  private val nullResult = new GenericInternalRow(schema.length)
+
+  // This function takes 2 parameters: an optional partial result, and the 
bad record. If the given
+  // schema doesn't contain a field for corrupted record, we just return 
the partial result or a
+  // row with all fields null. If the given schema contains a field for 
corrupted record, we will
+  // set the bad record to this field, and set other fields according to 
the partial result or null.
+  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = {
+if (corruptFieldIndex.isDefined) {
+  (row, badRecord) => {
+var i = 0
+while (i < actualSchema.length) {
+  val f = actualSchema(i)
+  resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, 
f.dataType)).orNull
+  i += 1
+}
+resultRow(corruptFieldIndex.get) = badRecord()
+resultRow
+  }
+} else {
+  (row, _) => row.getOrElse(nullResult)
+}
+  }
+
+  def parse(input: IN): Iterator[InternalRow] = {
+try {
+  rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
+} catch {
+  case e: BadRecordException if ParseModes.isPermissiveMode(mode) =>
+Iterator(toResultRow(e.partialResult(), e.record))
+  case _: BadRecordException if ParseModes.isDropMalformedMode(mode) =>
+Iterator.empty
+  case e: BadRecordException => throw e.cause
--- End diff --

We need a test case to cover it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r107013197
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class FailureSafeParser[IN](
+rawParser: IN => Seq[InternalRow],
+mode: String,
+schema: StructType,
+columnNameOfCorruptRecord: String) {
+
+  private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
+  private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
+  private val resultRow = new GenericInternalRow(schema.length)
+  private val nullResult = new GenericInternalRow(schema.length)
+
+  // This function takes 2 parameters: an optional partial result, and the 
bad record. If the given
+  // schema doesn't contain a field for corrupted record, we just return 
the partial result or a
+  // row with all fields null. If the given schema contains a field for 
corrupted record, we will
+  // set the bad record to this field, and set other fields according to 
the partial result or null.
+  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = {
+if (corruptFieldIndex.isDefined) {
+  (row, badRecord) => {
+var i = 0
+while (i < actualSchema.length) {
+  val f = actualSchema(i)
--- End diff --

Nit: `f` -> `from`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r107012038
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -382,11 +383,17 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
 }
 
 verifyColumnNameOfCorruptRecord(schema, 
parsedOptions.columnNameOfCorruptRecord)
+val dataSchema = StructType(schema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
--- End diff --

Nit: `dataSchema ` -> `actualSchema`? Be consistent what we did in the 
other place?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r107011371
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class FailureSafeParser[IN](
+rawParser: IN => Seq[InternalRow],
+mode: String,
+schema: StructType,
+columnNameOfCorruptRecord: String) {
+
+  private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
+  private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
+  private val resultRow = new GenericInternalRow(schema.length)
+  private val nullResult = new GenericInternalRow(schema.length)
+
+  // This function takes 2 parameters: an optional partial result, and the 
bad record. If the given
+  // schema doesn't contain a field for corrupted record, we just return 
the partial result or a
+  // row with all fields null. If the given schema contains a field for 
corrupted record, we will
+  // set the bad record to this field, and set other fields according to 
the partial result or null.
+  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = {
+if (corruptFieldIndex.isDefined) {
+  (row, badRecord) => {
+var i = 0
+while (i < actualSchema.length) {
+  val f = actualSchema(i)
+  resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, 
f.dataType)).orNull
+  i += 1
+}
+resultRow(corruptFieldIndex.get) = badRecord()
+resultRow
+  }
+} else {
+  (row, _) => row.getOrElse(nullResult)
+}
+  }
+
+  def parse(input: IN): Iterator[InternalRow] = {
+try {
+  rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
+} catch {
+  case e: BadRecordException if ParseModes.isPermissiveMode(mode) =>
+Iterator(toResultRow(e.partialResult(), e.record))
+  case _: BadRecordException if ParseModes.isDropMalformedMode(mode) =>
+Iterator.empty
+  case e: BadRecordException => throw e.cause
--- End diff --

This is `FAIL_FAST_MODE`, if my understanding is not wrong. Should we issue 
the error message including  `FAILFAST`, like what we did before?

This is also an behavior change? If users did not correctly spell the mode 
string, we treated it as the `PERMISSIVE` mode. Now, we changed it to the 
`FAILFAST ` mode.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106852242
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 ---
@@ -233,81 +187,39 @@ class UnivocityParser(
* Parses a single CSV string and turns it into either one resulting row 
or no row (if the
* the record is malformed).
*/
-  def parse(input: String): Option[InternalRow] = 
convert(tokenizer.parseLine(input))
-
-  private def convert(tokens: Array[String]): Option[InternalRow] = {
-convertWithParseMode(tokens) { tokens =>
-  var i: Int = 0
-  while (i < tokenIndexArr.length) {
-// It anyway needs to try to parse since it decides if this row is 
malformed
-// or not after trying to cast in `DROPMALFORMED` mode even if the 
casted
-// value is not stored in the row.
-val from = tokenIndexArr(i)
-val to = rowIndexArr(i)
-val value = valueConverters(from).apply(tokens(from))
-if (i < requiredSchema.length) {
-  row(to) = value
-}
-i += 1
-  }
-  row
-}
-  }
-
-  private def convertWithParseMode(
-  tokens: Array[String])(convert: Array[String] => InternalRow): 
Option[InternalRow] = {
-if (options.dropMalformed && dataSchema.length != tokens.length) {
-  if (numMalformedRecords < options.maxMalformedLogPerPartition) {
-logWarning(s"Dropping malformed line: 
${tokens.mkString(options.delimiter.toString)}")
-  }
-  if (numMalformedRecords == options.maxMalformedLogPerPartition - 1) {
-logWarning(
-  s"More than ${options.maxMalformedLogPerPartition} malformed 
records have been " +
-"found on this partition. Malformed records from now on will 
not be logged.")
+  def parse(input: String): InternalRow = 
convert(tokenizer.parseLine(input))
+
+  private def convert(tokens: Array[String]): InternalRow = {
+if (tokens.length != schema.length) {
+  // If the number of tokens doesn't match the schema, we should treat 
it as a malformed record.
+  // However, we still have chance to parse some of the tokens, by 
adding extra null tokens in
+  // the tail if the number is smaller, or by dropping extra tokens if 
the number is larger.
+  val checkedTokens = if (schema.length > tokens.length) {
+tokens ++ new Array[String](schema.length - tokens.length)
+  } else {
+tokens.take(schema.length)
   }
-  numMalformedRecords += 1
-  None
-} else if (options.failFast && dataSchema.length != tokens.length) {
-  throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
-s"${tokens.mkString(options.delimiter.toString)}")
-} else {
-  // If a length of parsed tokens is not equal to expected one, it 
makes the length the same
-  // with the expected. If the length is shorter, it adds extra tokens 
in the tail.
-  // If longer, it drops extra tokens.
-  //
-  // TODO: Revisit this; if a length of tokens does not match an 
expected length in the schema,
-  // we probably need to treat it as a malformed record.
-  // See an URL below for related discussions:
-  // https://github.com/apache/spark/pull/16928#discussion_r102657214
-  val checkedTokens = if (options.permissive && dataSchema.length != 
tokens.length) {
-if (dataSchema.length > tokens.length) {
-  tokens ++ new Array[String](dataSchema.length - tokens.length)
-} else {
-  tokens.take(dataSchema.length)
+  def getPartialResult(): Option[InternalRow] = {
+try {
+  Some(convert(checkedTokens))
+} catch {
+  case _: BadRecordException => None
 }
-  } else {
-tokens
   }
-
+  throw BadRecordException(
+() => getCurrentInput(),
+getPartialResult,
+new RuntimeException("Malformed CSV record"))
+} else {
   try {
-Some(convert(checkedTokens))
+for (i <- requiredSchema.indices) {
--- End diff --

(it seems this one missed. I am fine if it is new one but I am worried of 
the case of changing from `while` to `for`. Might be not a really big deal 
though).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106847502
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
 ---
@@ -113,8 +113,11 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
 
 (file: PartitionedFile) => {
   val conf = broadcastedHadoopConf.value.value
-  val parser = new UnivocityParser(dataSchema, requiredSchema, 
parsedOptions)
-  CSVDataSource(parsedOptions).readFile(conf, file, parser, 
parsedOptions)
+  val parser = new UnivocityParser(
+StructType(dataSchema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord)),
+StructType(requiredSchema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord)),
+parsedOptions)
+  CSVDataSource(parsedOptions).readFile(conf, file, parser, 
requiredSchema)
--- End diff --

We need, and the logic is in `readFile`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106847415
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -55,108 +52,6 @@ class JacksonParser(
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
-  private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
-
-  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
-  corruptFieldIndex.foreach { corrFieldIndex =>
-require(schema(corrFieldIndex).dataType == StringType)
-require(schema(corrFieldIndex).nullable)
-  }
--- End diff --

This is just a sanity check, actually this check is already done in 
`DataFrameReader.csv/json` and `JsonFileFormat/CSVFileFormat`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106842646
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
 ---
@@ -113,8 +113,11 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
 
 (file: PartitionedFile) => {
   val conf = broadcastedHadoopConf.value.value
-  val parser = new UnivocityParser(dataSchema, requiredSchema, 
parsedOptions)
-  CSVDataSource(parsedOptions).readFile(conf, file, parser, 
parsedOptions)
+  val parser = new UnivocityParser(
+StructType(dataSchema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord)),
+StructType(requiredSchema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord)),
+parsedOptions)
+  CSVDataSource(parsedOptions).readFile(conf, file, parser, 
requiredSchema)
--- End diff --

We do not need `FailureSafeParser` in this case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106840886
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -55,108 +52,6 @@ class JacksonParser(
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
-  private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
-
-  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
-  corruptFieldIndex.foreach { corrFieldIndex =>
-require(schema(corrFieldIndex).dataType == StringType)
-require(schema(corrFieldIndex).nullable)
-  }
--- End diff --

The above checking sounds missing in the new codes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106840806
  
--- Diff: R/pkg/inst/tests/testthat/test_sparkSQL.R ---
@@ -1354,9 +1354,8 @@ test_that("column functions", {
   # passing option
   df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}")))
   schema2 <- structType(structField("date", "date"))
-  expect_error(tryCatch(collect(select(df, from_json(df$col, schema2))),
-error = function(e) { stop(e) }),
-   paste0(".*(java.lang.NumberFormatException: For input 
string:).*"))
+  s <- collect(select(df, from_json(df$col, schema2)))
+  expect_equal(s[[1]][[1]], NA)
--- End diff --

uh,  I see.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106840345
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -382,11 +383,17 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
 }
 
 verifyColumnNameOfCorruptRecord(schema, 
parsedOptions.columnNameOfCorruptRecord)
+val dataSchema = StructType(schema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
 
 val createParser = CreateJacksonParser.string _
 val parsed = jsonDataset.rdd.mapPartitions { iter =>
-  val parser = new JacksonParser(schema, parsedOptions)
-  iter.flatMap(parser.parse(_, createParser, UTF8String.fromString))
+  val rawParser = new JacksonParser(dataSchema, parsedOptions)
+  val parser = new FailureSafeParser[String](
+input => rawParser.parse(input, createParser, 
UTF8String.fromString),
+parsedOptions.parseMode,
+schema,
--- End diff --

we need `schema` anyway, passing `dataSchema` only saves one line in 
`FailureSafeParser`...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106840206
  
--- Diff: R/pkg/inst/tests/testthat/test_sparkSQL.R ---
@@ -1354,9 +1354,8 @@ test_that("column functions", {
   # passing option
   df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}")))
   schema2 <- structType(structField("date", "date"))
-  expect_error(tryCatch(collect(select(df, from_json(df$col, schema2))),
-error = function(e) { stop(e) }),
-   paste0(".*(java.lang.NumberFormatException: For input 
string:).*"))
+  s <- collect(select(df, from_json(df$col, schema2)))
+  expect_equal(s[[1]][[1]], NA)
--- End diff --

yea it's a minor bug fix, see https://github.com/cloud-fan/spark/pull/4

I'm not sure if it worth a ticket.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106840169
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class FailureSafeParser[IN](
+func: IN => Seq[InternalRow],
+mode: String,
+schema: StructType,
+columnNameOfCorruptRecord: String) {
+
+  private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
+  private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
+  private val resultRow = new GenericInternalRow(schema.length)
+  private val nullResult = new GenericInternalRow(schema.length)
+
+  // This function takes 2 parameters: an optional partial result, and the 
bad record. If the given
+  // schema doesn't contain a field for corrupted record, we just return 
the partial result or a
+  // row with all fields null. If the given schema contains a field for 
corrupted record, we will
+  // set the bad record to this field, and set other fields according to 
the partial result or null.
+  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = {
+if (corruptFieldIndex.isDefined) {
+  (row, badRecord) => {
+var i = 0
+while (i < actualSchema.length) {
+  val f = actualSchema(i)
+  resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, 
f.dataType)).orNull
+  i += 1
+}
+resultRow(corruptFieldIndex.get) = badRecord()
+resultRow
+  }
+} else {
+  (row, badRecord) => row.getOrElse(nullResult)
--- End diff --

Nit: `(row, badRecord)` -> `(row, _)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106840126
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class FailureSafeParser[IN](
+func: IN => Seq[InternalRow],
--- End diff --

rename it? rawParser?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106840034
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -382,11 +383,17 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
 }
 
 verifyColumnNameOfCorruptRecord(schema, 
parsedOptions.columnNameOfCorruptRecord)
+val dataSchema = StructType(schema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
 
 val createParser = CreateJacksonParser.string _
 val parsed = jsonDataset.rdd.mapPartitions { iter =>
-  val parser = new JacksonParser(schema, parsedOptions)
-  iter.flatMap(parser.parse(_, createParser, UTF8String.fromString))
+  val rawParser = new JacksonParser(dataSchema, parsedOptions)
+  val parser = new FailureSafeParser[String](
+input => rawParser.parse(input, createParser, 
UTF8String.fromString),
+parsedOptions.parseMode,
+schema,
--- End diff --

How about passing `dataSchema` to `FailureSafeParser`? It can simplify the 
codes in `FailureSafeParser`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106838506
  
--- Diff: R/pkg/inst/tests/testthat/test_sparkSQL.R ---
@@ -1354,9 +1354,8 @@ test_that("column functions", {
   # passing option
   df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}")))
   schema2 <- structType(structField("date", "date"))
-  expect_error(tryCatch(collect(select(df, from_json(df$col, schema2))),
-error = function(e) { stop(e) }),
-   paste0(".*(java.lang.NumberFormatException: For input 
string:).*"))
+  s <- collect(select(df, from_json(df$col, schema2)))
+  expect_equal(s[[1]][[1]], NA)
--- End diff --

or a bug fix?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106838434
  
--- Diff: R/pkg/inst/tests/testthat/test_sparkSQL.R ---
@@ -1354,9 +1354,8 @@ test_that("column functions", {
   # passing option
   df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}")))
   schema2 <- structType(structField("date", "date"))
-  expect_error(tryCatch(collect(select(df, from_json(df$col, schema2))),
-error = function(e) { stop(e) }),
-   paste0(".*(java.lang.NumberFormatException: For input 
string:).*"))
+  s <- collect(select(df, from_json(df$col, schema2)))
+  expect_equal(s[[1]][[1]], NA)
--- End diff --

This also sounds a behavior change. Could you add another test case here to 
trigger the exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106806437
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 ---
@@ -233,81 +187,39 @@ class UnivocityParser(
* Parses a single CSV string and turns it into either one resulting row 
or no row (if the
* the record is malformed).
*/
-  def parse(input: String): Option[InternalRow] = 
convert(tokenizer.parseLine(input))
-
-  private def convert(tokens: Array[String]): Option[InternalRow] = {
-convertWithParseMode(tokens) { tokens =>
-  var i: Int = 0
-  while (i < tokenIndexArr.length) {
-// It anyway needs to try to parse since it decides if this row is 
malformed
-// or not after trying to cast in `DROPMALFORMED` mode even if the 
casted
-// value is not stored in the row.
-val from = tokenIndexArr(i)
-val to = rowIndexArr(i)
-val value = valueConverters(from).apply(tokens(from))
-if (i < requiredSchema.length) {
-  row(to) = value
-}
-i += 1
-  }
-  row
-}
-  }
-
-  private def convertWithParseMode(
-  tokens: Array[String])(convert: Array[String] => InternalRow): 
Option[InternalRow] = {
-if (options.dropMalformed && dataSchema.length != tokens.length) {
-  if (numMalformedRecords < options.maxMalformedLogPerPartition) {
-logWarning(s"Dropping malformed line: 
${tokens.mkString(options.delimiter.toString)}")
-  }
-  if (numMalformedRecords == options.maxMalformedLogPerPartition - 1) {
-logWarning(
-  s"More than ${options.maxMalformedLogPerPartition} malformed 
records have been " +
-"found on this partition. Malformed records from now on will 
not be logged.")
+  def parse(input: String): InternalRow = 
convert(tokenizer.parseLine(input))
+
+  private def convert(tokens: Array[String]): InternalRow = {
+if (tokens.length != schema.length) {
+  // If the number of tokens doesn't match the schema, we should treat 
it as a malformed record.
+  // However, we still have chance to parse some of the tokens, by 
adding extra null tokens in
+  // the tail if the number is smaller, or by dropping extra tokens if 
the number is larger.
+  val checkedTokens = if (schema.length > tokens.length) {
+tokens ++ new Array[String](schema.length - tokens.length)
+  } else {
+tokens.take(schema.length)
   }
-  numMalformedRecords += 1
-  None
-} else if (options.failFast && dataSchema.length != tokens.length) {
-  throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
-s"${tokens.mkString(options.delimiter.toString)}")
-} else {
-  // If a length of parsed tokens is not equal to expected one, it 
makes the length the same
-  // with the expected. If the length is shorter, it adds extra tokens 
in the tail.
-  // If longer, it drops extra tokens.
-  //
-  // TODO: Revisit this; if a length of tokens does not match an 
expected length in the schema,
-  // we probably need to treat it as a malformed record.
-  // See an URL below for related discussions:
-  // https://github.com/apache/spark/pull/16928#discussion_r102657214
-  val checkedTokens = if (options.permissive && dataSchema.length != 
tokens.length) {
-if (dataSchema.length > tokens.length) {
-  tokens ++ new Array[String](dataSchema.length - tokens.length)
-} else {
-  tokens.take(dataSchema.length)
+  def getPartialResult(): Option[InternalRow] = {
+try {
+  Some(convert(checkedTokens))
+} catch {
+  case _: BadRecordException => None
 }
-  } else {
-tokens
   }
-
+  throw BadRecordException(
+() => getCurrentInput(),
+getPartialResult,
+new RuntimeException("Malformed CSV record"))
--- End diff --

yes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-17 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106768186
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class FailureSafeParser[IN](
+func: IN => Seq[InternalRow],
+mode: String,
+schema: StructType,
+columnNameOfCorruptRecord: String) {
+
+  private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
+  private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
+  private val resultRow = new GenericInternalRow(schema.length)
+
+  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = {
+if (corruptFieldIndex.isDefined) {
+  (row, badRecord) => {
+for ((f, i) <- actualSchema.zipWithIndex) {
+  resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, 
f.dataType)).orNull
+}
+resultRow(corruptFieldIndex.get) = badRecord()
+resultRow
+  }
+} else {
+  (row, badRecord) => row.getOrElse {
+for (i <- schema.indices) resultRow.setNullAt(i)
--- End diff --

I ran some tests with the codes below to help.

```scala
object ForWhile {
  def forloop = {
val l = Array[Int](1,2,3)
for (i <- l) {
}
  }

  def whileloop = {
val arr = Array[Int](1,2,3)
var idx = 0
while(idx < arr.length) {
  idx += 1
}
  }
}
```

```
Compiled from "ForWhile.scala"
public final class ForWhile {
  public static void whileloop();
Code:
   0: getstatic #16 // Field 
ForWhile$.MODULE$:LForWhile$;
   3: invokevirtual #18 // Method 
ForWhile$.whileloop:()V
   6: return

  public static void forloop();
Code:
   0: getstatic #16 // Field 
ForWhile$.MODULE$:LForWhile$;
   3: invokevirtual #21 // Method ForWhile$.forloop:()V
   6: return
}

Compiled from "ForWhile.scala"
public final class ForWhile$ {
  public static final ForWhile$ MODULE$;

  public static {};
Code:
   0: new   #2  // class ForWhile$
   3: invokespecial #12 // Method "":()V
   6: return

  public void forloop();
Code:
   0: getstatic #18 // Field 
scala/Array$.MODULE$:Lscala/Array$;
   3: getstatic #23 // Field 
scala/Predef$.MODULE$:Lscala/Predef$;
   6: iconst_3
   7: newarray   int
   9: dup
  10: iconst_0
  11: iconst_1
  12: iastore
  13: dup
  14: iconst_1
  15: iconst_2
  16: iastore
  17: dup
  18: iconst_2
  19: iconst_3
  20: iastore
  21: invokevirtual #27 // Method 
scala/Predef$.wrapIntArray:([I)Lscala/collection/mutable/WrappedArray;
  24: getstatic #32 // Field 
scala/reflect/ClassTag$.MODULE$:Lscala/reflect/ClassTag$;
  27: invokevirtual #36 // Method 
scala/reflect/ClassTag$.Int:()Lscala/reflect/ClassTag;
  30: invokevirtual #40 // Method 
scala/Array$.apply:(Lscala/collection/Seq;Lscala/reflect/ClassTag;)Ljava/lang/Object;
  33: checkcast #42 // class "[I"
  36: astore_1
  37: getstatic #23 // Field 
scala/Predef$.MODULE$:Lscala/Predef$;
  40: aload_1
  41: 

[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-17 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106767486
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class FailureSafeParser[IN](
+func: IN => Seq[InternalRow],
+mode: String,
+schema: StructType,
+columnNameOfCorruptRecord: String) {
+
+  private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
+  private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
+  private val resultRow = new GenericInternalRow(schema.length)
+
+  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = {
+if (corruptFieldIndex.isDefined) {
+  (row, badRecord) => {
+for ((f, i) <- actualSchema.zipWithIndex) {
+  resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, 
f.dataType)).orNull
+}
+resultRow(corruptFieldIndex.get) = badRecord()
+resultRow
+  }
+} else {
+  (row, badRecord) => row.getOrElse {
+for (i <- schema.indices) resultRow.setNullAt(i)
--- End diff --

I ran some tests. I can see boxing/unboxing where it seems does not exist 
in while loop.

```scala
object ForWhile {
  def forloop = {
val l = Seq(1,2,3).indices
for (i <- l) println(i)
  }
}
```

```
2: invokestatic  #46 // Method 
scala/runtime/BoxesRunTime.unboxToInt:(Ljava/lang/Object;)I
```

```
4: invokestatic  #37 // Method 
scala/runtime/BoxesRunTime.boxToInteger:(I)Ljava/lang/Integer;
```




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-17 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106650196
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class FailureSafeParser[IN](
+func: IN => Seq[InternalRow],
+mode: String,
+schema: StructType,
+columnNameOfCorruptRecord: String) {
+
+  private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
+  private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
+  private val resultRow = new GenericInternalRow(schema.length)
+
+  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = {
+if (corruptFieldIndex.isDefined) {
+  (row, badRecord) => {
+for ((f, i) <- actualSchema.zipWithIndex) {
--- End diff --

Here too. I think this case applies to 
https://github.com/databricks/scala-style-guide#traversal-and-zipwithindex . 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-17 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106664841
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class FailureSafeParser[IN](
+func: IN => Seq[InternalRow],
+mode: String,
+schema: StructType,
+columnNameOfCorruptRecord: String) {
+
+  private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
+  private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
+  private val resultRow = new GenericInternalRow(schema.length)
+
+  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = {
+if (corruptFieldIndex.isDefined) {
+  (row, badRecord) => {
+for ((f, i) <- actualSchema.zipWithIndex) {
+  resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, 
f.dataType)).orNull
+}
+resultRow(corruptFieldIndex.get) = badRecord()
+resultRow
+  }
+} else {
+  (row, badRecord) => row.getOrElse {
+for (i <- schema.indices) resultRow.setNullAt(i)
+resultRow
+  }
+}
+  }
+
+  def parse(input: IN): Iterator[InternalRow] = {
+try {
+  func(input).toIterator.map(row => toResultRow(Some(row), () => null))
+} catch {
+  case e: BadRecordException if ParseModes.isPermissiveMode(mode) =>
+Iterator(toResultRow(e.partialResult(), e.record))
+  case _: BadRecordException if ParseModes.isDropMalformedMode(mode) =>
+Iterator.empty
+  // If the parse mode is FAIL FAST, do not catch the exception.
+}
+  }
+}
+
+case class BadRecordException(
+record: () => UTF8String,
+partialResult: () => Option[InternalRow],
--- End diff --

I think we could leave some comments for `partialResult` too. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-17 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106664412
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class FailureSafeParser[IN](
+func: IN => Seq[InternalRow],
+mode: String,
+schema: StructType,
+columnNameOfCorruptRecord: String) {
+
+  private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
+  private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
+  private val resultRow = new GenericInternalRow(schema.length)
+
+  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = {
--- End diff --

Maybe, we should explain what happens within `toResultRow`.  This might 
also be okay to be included in the follow-up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-17 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106655740
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 ---
@@ -233,81 +187,39 @@ class UnivocityParser(
* Parses a single CSV string and turns it into either one resulting row 
or no row (if the
* the record is malformed).
*/
-  def parse(input: String): Option[InternalRow] = 
convert(tokenizer.parseLine(input))
-
-  private def convert(tokens: Array[String]): Option[InternalRow] = {
-convertWithParseMode(tokens) { tokens =>
-  var i: Int = 0
-  while (i < tokenIndexArr.length) {
-// It anyway needs to try to parse since it decides if this row is 
malformed
-// or not after trying to cast in `DROPMALFORMED` mode even if the 
casted
-// value is not stored in the row.
-val from = tokenIndexArr(i)
-val to = rowIndexArr(i)
-val value = valueConverters(from).apply(tokens(from))
-if (i < requiredSchema.length) {
-  row(to) = value
-}
-i += 1
-  }
-  row
-}
-  }
-
-  private def convertWithParseMode(
-  tokens: Array[String])(convert: Array[String] => InternalRow): 
Option[InternalRow] = {
-if (options.dropMalformed && dataSchema.length != tokens.length) {
-  if (numMalformedRecords < options.maxMalformedLogPerPartition) {
-logWarning(s"Dropping malformed line: 
${tokens.mkString(options.delimiter.toString)}")
-  }
-  if (numMalformedRecords == options.maxMalformedLogPerPartition - 1) {
-logWarning(
-  s"More than ${options.maxMalformedLogPerPartition} malformed 
records have been " +
-"found on this partition. Malformed records from now on will 
not be logged.")
+  def parse(input: String): InternalRow = 
convert(tokenizer.parseLine(input))
+
+  private def convert(tokens: Array[String]): InternalRow = {
+if (tokens.length != schema.length) {
+  // If the number of tokens doesn't match the schema, we should treat 
it as a malformed record.
+  // However, we still have chance to parse some of the tokens, by 
adding extra null tokens in
+  // the tail if the number is smaller, or by dropping extra tokens if 
the number is larger.
+  val checkedTokens = if (schema.length > tokens.length) {
+tokens ++ new Array[String](schema.length - tokens.length)
+  } else {
+tokens.take(schema.length)
   }
-  numMalformedRecords += 1
-  None
-} else if (options.failFast && dataSchema.length != tokens.length) {
-  throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
-s"${tokens.mkString(options.delimiter.toString)}")
-} else {
-  // If a length of parsed tokens is not equal to expected one, it 
makes the length the same
-  // with the expected. If the length is shorter, it adds extra tokens 
in the tail.
-  // If longer, it drops extra tokens.
-  //
-  // TODO: Revisit this; if a length of tokens does not match an 
expected length in the schema,
-  // we probably need to treat it as a malformed record.
-  // See an URL below for related discussions:
-  // https://github.com/apache/spark/pull/16928#discussion_r102657214
-  val checkedTokens = if (options.permissive && dataSchema.length != 
tokens.length) {
-if (dataSchema.length > tokens.length) {
-  tokens ++ new Array[String](dataSchema.length - tokens.length)
-} else {
-  tokens.take(dataSchema.length)
+  def getPartialResult(): Option[InternalRow] = {
+try {
+  Some(convert(checkedTokens))
+} catch {
+  case _: BadRecordException => None
 }
-  } else {
-tokens
   }
-
+  throw BadRecordException(
+() => getCurrentInput(),
+getPartialResult,
+new RuntimeException("Malformed CSV record"))
--- End diff --

I see. So, `BadRecordException` contains the information about which record 
is malformed so that users can check this. Did I understand correctly?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org


[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-17 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106653556
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -55,108 +52,6 @@ class JacksonParser(
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
-  private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
-
-  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
-  corruptFieldIndex.foreach { corrFieldIndex =>
-require(schema(corrFieldIndex).dataType == StringType)
-require(schema(corrFieldIndex).nullable)
-  }
-
-  @transient
-  private[this] var isWarningPrinted: Boolean = false
-
-  @transient
-  private def printWarningForMalformedRecord(record: () => UTF8String): 
Unit = {
-def sampleRecord: String = {
-  if (options.wholeFile) {
-""
-  } else {
-s"Sample record: ${record()}\n"
-  }
-}
-
-def footer: String = {
--- End diff --

It seems here the warnings are not being printed for both. Should we maybe 
consider printing shorten versions of messages for CSV and JSON?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-17 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106648055
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class FailureSafeParser[IN](
+func: IN => Seq[InternalRow],
+mode: String,
+schema: StructType,
+columnNameOfCorruptRecord: String) {
+
+  private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
+  private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
+  private val resultRow = new GenericInternalRow(schema.length)
+
+  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = {
+if (corruptFieldIndex.isDefined) {
+  (row, badRecord) => {
+for ((f, i) <- actualSchema.zipWithIndex) {
+  resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, 
f.dataType)).orNull
+}
+resultRow(corruptFieldIndex.get) = badRecord()
+resultRow
+  }
+} else {
+  (row, badRecord) => row.getOrElse {
+for (i <- schema.indices) resultRow.setNullAt(i)
--- End diff --

Up to my understanding, the last commit focuses on simplification. I like 
that but I think maybe we should use `while` here instead unless we are very 
sure that the byte codes are virtually the same or more efficient in the 
critical path. This change might not harm the simplification much here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106593549
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 ---
@@ -65,7 +65,7 @@ private[sql] class JSONOptions(
   val allowBackslashEscapingAnyCharacter =
 
parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
   val compressionCodec = 
parameters.get("compression").map(CompressionCodecs.getCodecClassName)
-  private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
+  val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
--- End diff --

yea this can be a good follow-up


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-16 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106570171
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 ---
@@ -65,7 +65,7 @@ private[sql] class JSONOptions(
   val allowBackslashEscapingAnyCharacter =
 
parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
   val compressionCodec = 
parameters.get("compression").map(CompressionCodecs.getCodecClassName)
-  private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
+  val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
--- End diff --

How about creating an enum, like what we are doing for `SaveMode`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-16 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/17315#discussion_r106566183
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -391,9 +288,9 @@ class JacksonParser(
 
 case token =>
   // We cannot parse this token based on the given data type. So, we 
throw a
-  // SparkSQLJsonProcessingException and this exception will be caught 
by
+  // SparkSQLRuntimeException and this exception will be caught by
--- End diff --

`RuntimeException `?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17315: [SPARK-19949][SQL] unify bad record handling in C...

2017-03-16 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/17315

[SPARK-19949][SQL] unify bad record handling in CSV and JSON

## What changes were proposed in this pull request?

Currently JSON and CSV have exactly the same logic about handling bad 
records, this PR tries to abstract it and put it in a upper level to reduce 
code duplication.

The overall idea is, we make the JSON and CSV parser to throw a 
BadRecordException, then the upper level, FailureSafeParser, handles bad 
records according to the parse mode.

Behavior changes:
1. with PERMISSIVE mode, if the number of tokens doesn't match the schema, 
previously CSV parser will treat it as a legal record and parse as many tokens 
as possible. After this PR, we treat it as an illegal record, and put the raw 
record string in a special column, but we still parse as many tokens as 
possible.
2. all logging is removed as they are not very useful in practice.

## How was this patch tested?

existing tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark bad-record2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17315.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17315


commit adfde77125eb31b262a2f010851beef2b872e1e8
Author: Wenchen Fan 
Date:   2017-03-16T13:52:59Z

unify bad record handling in CSV and JSON




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org