Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17315#discussion_r106768186
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
    @@ -0,0 +1,68 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.util
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.unsafe.types.UTF8String
    +
    +class FailureSafeParser[IN](
    +    func: IN => Seq[InternalRow],
    +    mode: String,
    +    schema: StructType,
    +    columnNameOfCorruptRecord: String) {
    +
    +  private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
    +  private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
    +  private val resultRow = new GenericInternalRow(schema.length)
    +
    +  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = {
    +    if (corruptFieldIndex.isDefined) {
    +      (row, badRecord) => {
    +        for ((f, i) <- actualSchema.zipWithIndex) {
    +          resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, 
f.dataType)).orNull
    +        }
    +        resultRow(corruptFieldIndex.get) = badRecord()
    +        resultRow
    +      }
    +    } else {
    +      (row, badRecord) => row.getOrElse {
    +        for (i <- schema.indices) resultRow.setNullAt(i)
    --- End diff --
    
    I ran some tests with the codes below to help.
    
    ```scala
    object ForWhile {
      def forloop = {
        val l = Array[Int](1,2,3)
        for (i <- l) {
        }
      }
    
      def whileloop = {
        val arr = Array[Int](1,2,3)
        var idx = 0
        while(idx < arr.length) {
          idx += 1
        }
      }
    }
    ```
    
    ```
    Compiled from "ForWhile.scala"
    public final class ForWhile {
      public static void whileloop();
        Code:
           0: getstatic     #16                 // Field 
ForWhile$.MODULE$:LForWhile$;
           3: invokevirtual #18                 // Method 
ForWhile$.whileloop:()V
           6: return
    
      public static void forloop();
        Code:
           0: getstatic     #16                 // Field 
ForWhile$.MODULE$:LForWhile$;
           3: invokevirtual #21                 // Method ForWhile$.forloop:()V
           6: return
    }
    
    Compiled from "ForWhile.scala"
    public final class ForWhile$ {
      public static final ForWhile$ MODULE$;
    
      public static {};
        Code:
           0: new           #2                  // class ForWhile$
           3: invokespecial #12                 // Method "<init>":()V
           6: return
    
      public void forloop();
        Code:
           0: getstatic     #18                 // Field 
scala/Array$.MODULE$:Lscala/Array$;
           3: getstatic     #23                 // Field 
scala/Predef$.MODULE$:Lscala/Predef$;
           6: iconst_3
           7: newarray       int
           9: dup
          10: iconst_0
          11: iconst_1
          12: iastore
          13: dup
          14: iconst_1
          15: iconst_2
          16: iastore
          17: dup
          18: iconst_2
          19: iconst_3
          20: iastore
          21: invokevirtual #27                 // Method 
scala/Predef$.wrapIntArray:([I)Lscala/collection/mutable/WrappedArray;
          24: getstatic     #32                 // Field 
scala/reflect/ClassTag$.MODULE$:Lscala/reflect/ClassTag$;
          27: invokevirtual #36                 // Method 
scala/reflect/ClassTag$.Int:()Lscala/reflect/ClassTag;
          30: invokevirtual #40                 // Method 
scala/Array$.apply:(Lscala/collection/Seq;Lscala/reflect/ClassTag;)Ljava/lang/Object;
          33: checkcast     #42                 // class "[I"
          36: astore_1
          37: getstatic     #23                 // Field 
scala/Predef$.MODULE$:Lscala/Predef$;
          40: aload_1
          41: invokevirtual #46                 // Method 
scala/Predef$.intArrayOps:([I)Lscala/collection/mutable/ArrayOps;
          44: new           #48                 // class 
ForWhile$$anonfun$forloop$1
          47: dup
          48: invokespecial #49                 // Method 
ForWhile$$anonfun$forloop$1."<init>":()V
          51: invokeinterface #55,  2           // InterfaceMethod 
scala/collection/mutable/ArrayOps.foreach:(Lscala/Function1;)V
          56: return
    
      public void whileloop();
        Code:
           0: getstatic     #18                 // Field 
scala/Array$.MODULE$:Lscala/Array$;
           3: getstatic     #23                 // Field 
scala/Predef$.MODULE$:Lscala/Predef$;
           6: iconst_3
           7: newarray       int
           9: dup
          10: iconst_0
          11: iconst_1
          12: iastore
          13: dup
          14: iconst_1
          15: iconst_2
          16: iastore
          17: dup
          18: iconst_2
          19: iconst_3
          20: iastore
          21: invokevirtual #27                 // Method 
scala/Predef$.wrapIntArray:([I)Lscala/collection/mutable/WrappedArray;
          24: getstatic     #32                 // Field 
scala/reflect/ClassTag$.MODULE$:Lscala/reflect/ClassTag$;
          27: invokevirtual #36                 // Method 
scala/reflect/ClassTag$.Int:()Lscala/reflect/ClassTag;
          30: invokevirtual #40                 // Method 
scala/Array$.apply:(Lscala/collection/Seq;Lscala/reflect/ClassTag;)Ljava/lang/Object;
          33: checkcast     #42                 // class "[I"
          36: astore_1
          37: iconst_0
          38: istore_2
          39: iload_2
          40: aload_1
          41: arraylength
          42: if_icmpge     52
          45: iload_2
          46: iconst_1
          47: iadd
          48: istore_2
          49: goto          39
          52: return
    }
    
    Compiled from "ForWhile.scala"
    public final class ForWhile$$anonfun$forloop$1 extends 
scala.runtime.AbstractFunction1$mcVI$sp implements scala.Serializable {
      public static final long serialVersionUID;
    
      public final void apply(int);
        Code:
           0: aload_0
           1: iload_1
           2: invokevirtual #21                 // Method apply$mcVI$sp:(I)V
           5: return
    
      public void apply$mcVI$sp(int);
        Code:
           0: return
    
      public final java.lang.Object apply(java.lang.Object);
        Code:
           0: aload_0
           1: aload_1
           2: invokestatic  #32                 // Method 
scala/runtime/BoxesRunTime.unboxToInt:(Ljava/lang/Object;)I
           5: invokevirtual #34                 // Method apply:(I)V
           8: getstatic     #40                 // Field 
scala/runtime/BoxedUnit.UNIT:Lscala/runtime/BoxedUnit;
          11: areturn
    
      public ForWhile$$anonfun$forloop$1();
        Code:
           0: aload_0
           1: invokespecial #45                 // Method 
scala/runtime/AbstractFunction1$mcVI$sp."<init>":()V
           4: return
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to