Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/17315#discussion_r106768186
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class FailureSafeParser[IN](
+ func: IN => Seq[InternalRow],
+ mode: String,
+ schema: StructType,
+ columnNameOfCorruptRecord: String) {
+
+ private val corruptFieldIndex =
schema.getFieldIndex(columnNameOfCorruptRecord)
+ private val actualSchema = StructType(schema.filterNot(_.name ==
columnNameOfCorruptRecord))
+ private val resultRow = new GenericInternalRow(schema.length)
+
+ private val toResultRow: (Option[InternalRow], () => UTF8String) =>
InternalRow = {
+ if (corruptFieldIndex.isDefined) {
+ (row, badRecord) => {
+ for ((f, i) <- actualSchema.zipWithIndex) {
+ resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i,
f.dataType)).orNull
+ }
+ resultRow(corruptFieldIndex.get) = badRecord()
+ resultRow
+ }
+ } else {
+ (row, badRecord) => row.getOrElse {
+ for (i <- schema.indices) resultRow.setNullAt(i)
--- End diff --
I ran some tests with the codes below to help.
```scala
object ForWhile {
def forloop = {
val l = Array[Int](1,2,3)
for (i <- l) {
}
}
def whileloop = {
val arr = Array[Int](1,2,3)
var idx = 0
while(idx < arr.length) {
idx += 1
}
}
}
```
```
Compiled from "ForWhile.scala"
public final class ForWhile {
public static void whileloop();
Code:
0: getstatic #16 // Field
ForWhile$.MODULE$:LForWhile$;
3: invokevirtual #18 // Method
ForWhile$.whileloop:()V
6: return
public static void forloop();
Code:
0: getstatic #16 // Field
ForWhile$.MODULE$:LForWhile$;
3: invokevirtual #21 // Method ForWhile$.forloop:()V
6: return
}
Compiled from "ForWhile.scala"
public final class ForWhile$ {
public static final ForWhile$ MODULE$;
public static {};
Code:
0: new #2 // class ForWhile$
3: invokespecial #12 // Method "<init>":()V
6: return
public void forloop();
Code:
0: getstatic #18 // Field
scala/Array$.MODULE$:Lscala/Array$;
3: getstatic #23 // Field
scala/Predef$.MODULE$:Lscala/Predef$;
6: iconst_3
7: newarray int
9: dup
10: iconst_0
11: iconst_1
12: iastore
13: dup
14: iconst_1
15: iconst_2
16: iastore
17: dup
18: iconst_2
19: iconst_3
20: iastore
21: invokevirtual #27 // Method
scala/Predef$.wrapIntArray:([I)Lscala/collection/mutable/WrappedArray;
24: getstatic #32 // Field
scala/reflect/ClassTag$.MODULE$:Lscala/reflect/ClassTag$;
27: invokevirtual #36 // Method
scala/reflect/ClassTag$.Int:()Lscala/reflect/ClassTag;
30: invokevirtual #40 // Method
scala/Array$.apply:(Lscala/collection/Seq;Lscala/reflect/ClassTag;)Ljava/lang/Object;
33: checkcast #42 // class "[I"
36: astore_1
37: getstatic #23 // Field
scala/Predef$.MODULE$:Lscala/Predef$;
40: aload_1
41: invokevirtual #46 // Method
scala/Predef$.intArrayOps:([I)Lscala/collection/mutable/ArrayOps;
44: new #48 // class
ForWhile$$anonfun$forloop$1
47: dup
48: invokespecial #49 // Method
ForWhile$$anonfun$forloop$1."<init>":()V
51: invokeinterface #55, 2 // InterfaceMethod
scala/collection/mutable/ArrayOps.foreach:(Lscala/Function1;)V
56: return
public void whileloop();
Code:
0: getstatic #18 // Field
scala/Array$.MODULE$:Lscala/Array$;
3: getstatic #23 // Field
scala/Predef$.MODULE$:Lscala/Predef$;
6: iconst_3
7: newarray int
9: dup
10: iconst_0
11: iconst_1
12: iastore
13: dup
14: iconst_1
15: iconst_2
16: iastore
17: dup
18: iconst_2
19: iconst_3
20: iastore
21: invokevirtual #27 // Method
scala/Predef$.wrapIntArray:([I)Lscala/collection/mutable/WrappedArray;
24: getstatic #32 // Field
scala/reflect/ClassTag$.MODULE$:Lscala/reflect/ClassTag$;
27: invokevirtual #36 // Method
scala/reflect/ClassTag$.Int:()Lscala/reflect/ClassTag;
30: invokevirtual #40 // Method
scala/Array$.apply:(Lscala/collection/Seq;Lscala/reflect/ClassTag;)Ljava/lang/Object;
33: checkcast #42 // class "[I"
36: astore_1
37: iconst_0
38: istore_2
39: iload_2
40: aload_1
41: arraylength
42: if_icmpge 52
45: iload_2
46: iconst_1
47: iadd
48: istore_2
49: goto 39
52: return
}
Compiled from "ForWhile.scala"
public final class ForWhile$$anonfun$forloop$1 extends
scala.runtime.AbstractFunction1$mcVI$sp implements scala.Serializable {
public static final long serialVersionUID;
public final void apply(int);
Code:
0: aload_0
1: iload_1
2: invokevirtual #21 // Method apply$mcVI$sp:(I)V
5: return
public void apply$mcVI$sp(int);
Code:
0: return
public final java.lang.Object apply(java.lang.Object);
Code:
0: aload_0
1: aload_1
2: invokestatic #32 // Method
scala/runtime/BoxesRunTime.unboxToInt:(Ljava/lang/Object;)I
5: invokevirtual #34 // Method apply:(I)V
8: getstatic #40 // Field
scala/runtime/BoxedUnit.UNIT:Lscala/runtime/BoxedUnit;
11: areturn
public ForWhile$$anonfun$forloop$1();
Code:
0: aload_0
1: invokespecial #45 // Method
scala/runtime/AbstractFunction1$mcVI$sp."<init>":()V
4: return
}
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]